c++11 并发队列的生产方案 BlockingConcurrentQueue
Posted BBinChina
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++11 并发队列的生产方案 BlockingConcurrentQueue相关的知识,希望对你有一定的参考价值。
这是我在生产中使用到的一个c++11 实现的无锁队列,有以下特性:
1、线程安全的无锁队列
2、支持c++11 的move语义,优化对象拷贝性能
3、模板化
4、可预分配内存、也可动态分配
5、支持批量处理
6、包含阻塞队列
7、异常安全
demo
将github里的文件 Blockingconcurrentqueue.h 跟 Concurrentqueue.h 加载到项目中
#include "Blockingconcurrentqueue.h"
//声明队列
moodycamel::BlockingConcurrentQueue<T> g_myQueue;
//工作函数监听队列消息
void concumer() {
while (!_bStop) {
T t;
g_myQueue.wait_dequeue(T);
}
}
void producer() {
T t;
g_myQueue.enqueue(std::move(t));
}
原理
CAS保证线程安全
首先,线程安全通常采用互斥量,或者条件变量,本质就是锁机制,或者前文介绍过的Dirsuptor采用的CAS机制,这两种机制其实也是我们常说的悲观锁、乐观锁模式。
互斥量:需要获取到锁才能进入。
CAS:通常可能一次比对就可以进入逻辑块,或者多试几次进行更新值。
stl提供的cas操作:
bool atomic_compare_exchange_weak (atomic* obj, T* expected, T val);
首先函数会将 obj 与 expected 的内容作比较:
如果相等,那么将交换 obj 和 val 的值,并返回 true。
如果不相等,则什么也不做,之后返回 false。
ConcurrentQueue 实现的线程安全以及高性能所使用到的也便是CAS机制。
c++11 move语义
// Enqueues a single item (by moving it, if possible).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T&& item)
{
if ((details::likely)(inner.enqueue(std::move(item)))) {
sema->signal();
return true;
}
return false;
}
工作线程放入一个元素时,采用move语义减少对象创建拷贝
模板化内存管理
template<AllocationMode canAlloc, typename U>
inline bool inner_enqueue(U&& element)
{
auto producer = get_or_add_implicit_producer();
return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
}
当不同的生产者生产元素时,每个生产者在总队列里时采用子队列形式隔离的,每个生产者的元素采用块的形式存储而非链表方式,提供存储性能,但这样也造成了一个问题:当多个生产者间的元素有顺序要求时,整体队列无法做到。比如 a生产a消息,b生产b消息,宏观上可能ab有先后顺序,但是消费时,因为a跟b时存储在不同的块,出队的顺序独立,所以有可能存在ba的出队顺序。在生产过程中需要区分好队列的生产跟消费逻辑
看过STL的源码,大家应该队AllocationMode会有大概的认知吧。
以上是关于c++11 并发队列的生产方案 BlockingConcurrentQueue的主要内容,如果未能解决你的问题,请参考以下文章