C++并发编程:构建线程安全队列(第一部分:粗粒度锁)
引言
在多线程编程中,线程之间的数据共享和通信是一个非常重要的问题。在这篇博客中,我们将讨论如何用C++实现一个基础但非常实用的线程安全队列。这个队列使用粗粒度的互斥锁和条件变量来实现。
线程安全队列的基础实现
下面是基础代码结构:
template <typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
// ...(省略其余代码)
};
互斥锁和条件变量
-
std::mutex mut
: 用于确保队列操作的线程安全。 -
std::condition_variable data_cond
: 用于阻塞和唤醒等待队列操作的线程。
push方法
void push(T new_value)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
std::unique_lock lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
这里使用 std::unique_lock
来获取互斥锁,确保数据的线程安全。然后使用 data_cond.notify_one()
来唤醒可能正在等待队列变为非空的线程。
pop方法
对于 pop
,我们有两个版本:
-
wait_and_pop
:等待直到队列非空。 -
try_pop
:尝试弹出,如果队列为空则立即返回。
void wait_and_pop(T& value)
{
std::unique_lock lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = std::move(*data_queue.front());
data_queue.pop();
}
bool try_pop(T& value)
{
std::unique_lock lk(mut);
if (data_queue.empty()) return false;
value = std::move(*data_queue.front());
data_queue.pop();
return true;
}
在 wait_and_pop
中,我们使用 data_cond.wait()
来阻塞当前线程,直到队列变为非空。
测试
我们使用了一个生产者线程和两个消费者线程进行测试。
// 测试函数
void test_threadsafe_queue()
{
threadsafe_queue<int> tsq;
// 创建一个生产者线程
std::thread producer([&]()
{
for (int i = 0; i < 10; ++i)
{
std::cout << "Pushing " << i << std::endl;
tsq.push(i);
}
});
// 创建两个消费者线程
std::thread consumer1([&]()
{
for (int i = 0; i < 5; ++i)
{
int value;
tsq.wait_and_pop(value);
std::cout << "Consumer 1 popped " << value << std::endl;
}
});
std::thread consumer2([&]()
{
for (int i = 0; i < 5; ++i)
{
int value;
tsq.wait_and_pop(value);
std::cout << "Consumer 2 popped " << value << std::endl;
}
});
// 等待所有线程完成
producer.join();
consumer1.join();
consumer2.join();
}
完整代码
template <typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() = default;
void wait_and_pop(T& value)
{
std::unique_lock lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
value = std::move(*data_queue.front());
data_queue.pop();
}
bool try_pop(T& value)
{
std::unique_lock lk(mut);
if (data_queue.empty()) return false;
value = std::move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::unique_lock lk(mut);
if (data_queue.empty()) return std::make_shared<T>();
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
std::unique_lock lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty()
{
std::unique_lock lk(mut);
return data_queue.empty();
}
};
// 测试函数
void test_threadsafe_queue()
{
threadsafe_queue<int> tsq;
// 创建一个生产者线程
std::thread producer([&]()
{
for (int i = 0; i < 10; ++i)
{
std::cout << "Pushing " << i << std::endl;
tsq.push(i);
}
});
// 创建两个消费者线程
std::thread consumer1([&]()
{
for (int i = 0; i < 5; ++i)
{
int value;
tsq.wait_and_pop(value);
std::cout << "Consumer 1 popped " << value << std::endl;
}
});
std::thread consumer2([&]()
{
for (int i = 0; i < 5; ++i)
{
int value;
tsq.wait_and_pop(value);
std::cout << "Consumer 2 popped " << value << std::endl;
}
});
// 等待所有线程完成
producer.join();
consumer1.join();
consumer2.join();
}
int main()
{
test_threadsafe_queue();
return 0;
}
总结
这篇博客中,我们简要介绍了如何使用C++的标准库来实现一个基础的线程安全队列。虽然我们使用了粗粒度的互斥锁,但这个实现是非常实用和直观的。在下一篇博客中,我们将讨论如何进行优化,以提高性能和效率。