c++11 并发队列的生产方案 BlockingConcurrentQueue

Posted BBinChina

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++11 并发队列的生产方案 BlockingConcurrentQueue相关的知识,希望对你有一定的参考价值。

ConcurrentQueue

这是我在生产中使用到的一个c++11 实现的无锁队列,有以下特性:

1、线程安全的无锁队列
2、支持c++11 的move语义,优化对象拷贝性能
3、模板化
4、可预分配内存、也可动态分配
5、支持批量处理
6、包含阻塞队列
7、异常安全

demo

将github里的文件 Blockingconcurrentqueue.h 跟 Concurrentqueue.h 加载到项目中

#include "Blockingconcurrentqueue.h"
//声明队列
moodycamel::BlockingConcurrentQueue<T> g_myQueue;

//工作函数监听队列消息
void concumer() {
	while (!_bStop) {
		T t;
		g_myQueue.wait_dequeue(T);
	}
}

void producer() {
	T t;
    g_myQueue.enqueue(std::move(t));
}

原理

CAS保证线程安全

首先,线程安全通常采用互斥量,或者条件变量,本质就是锁机制,或者前文介绍过的Dirsuptor采用的CAS机制,这两种机制其实也是我们常说的悲观锁、乐观锁模式。
互斥量:需要获取到锁才能进入。
CAS:通常可能一次比对就可以进入逻辑块,或者多试几次进行更新值。

stl提供的cas操作:

bool atomic_compare_exchange_weak (atomic* obj, T* expected, T val);

首先函数会将 obj 与 expected 的内容作比较:

如果相等,那么将交换 obj 和 val 的值,并返回 true。
如果不相等,则什么也不做,之后返回 false。

ConcurrentQueue 实现的线程安全以及高性能所使用到的也便是CAS机制。

c++11 move语义

// Enqueues a single item (by moving it, if possible).
	// Allocates memory if required. Only fails if memory allocation fails (or implicit
	// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
	// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
	// Thread-safe.
	inline bool enqueue(T&& item)
	{
		if ((details::likely)(inner.enqueue(std::move(item)))) {
			sema->signal();
			return true;
		}
		return false;
	}

工作线程放入一个元素时,采用move语义减少对象创建拷贝

模板化内存管理

template<AllocationMode canAlloc, typename U>
	inline bool inner_enqueue(U&& element)
	{
		auto producer = get_or_add_implicit_producer();
		return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
	}

当不同的生产者生产元素时,每个生产者在总队列里时采用子队列形式隔离的,每个生产者的元素采用块的形式存储而非链表方式,提供存储性能,但这样也造成了一个问题:当多个生产者间的元素有顺序要求时,整体队列无法做到。比如 a生产a消息,b生产b消息,宏观上可能ab有先后顺序,但是消费时,因为a跟b时存储在不同的块,出队的顺序独立,所以有可能存在ba的出队顺序。在生产过程中需要区分好队列的生产跟消费逻辑

看过STL的源码,大家应该队AllocationMode会有大概的认知吧。

以上是关于c++11 并发队列的生产方案 BlockingConcurrentQueue的主要内容,如果未能解决你的问题,请参考以下文章

综合运用: C++11 多线程下生产者消费者模型详解(转)

C++11并发,有锁队列和无锁队列

并发编程-J.U.C组件拓展之阻塞队列BlockingQueue

并发无锁队列学习(单生产者单消费者模型)

161212并发编程中的关于队列

java并发:多生产者一消费者