C++11并发,有锁队列和无锁队列

Posted 帝王铠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++11并发,有锁队列和无锁队列相关的知识,希望对你有一定的参考价值。

有锁队列

#include <memory>
#include <mutex>
template<typename T>
class threadsafe_queue

private:
	struct node
	
		std::shared_ptr<T> data;
		std::unique_ptr<node> next;
	;

	std::mutex head_mutex;
	std::unique_ptr<node> head;
	std::mutex tail_mutex;
	node* tail;
	std::condition_variable data_cond;
	node* get_tail()
	
		std::lock_guard<std::mutex> tail_lock(tail_mutex);
		return tail;
	

	 std::unique_ptr<node> pop_head()
	
		std::unique_ptr<node> old_head = std::move(head);
		head = std::move(old_head->next);
		return old_head;
	

	std::unique_lock<std::mutex> wait_for_data()
	
		std::unique_lock<std::mutex> head_lock(head_mutex);
		data_cond.wait(head_lock, [&] return head.get() != get_tail(); );
		return std::move(head_lock);
	

	std::unique_ptr<node> wait_pop_head()
	
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		return pop_head();
	

	std::unique_ptr<node> wait_pop_head(T& value)
	
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		value = std::move(*head->data);
		return pop_head();
	
	std::unique_ptr<node> try_pop_head()
	
		std::lock_guard<std::mutex> head_lock(head_mutex);
		if (head.get() == get_tail())
		
			return std::unique_ptr<node>();
		
		return pop_head();
	

	std::unique_ptr<node> try_pop_head(T& value)
	
		std::lock_guard<std::mutex> head_lock(head_mutex);
		if (head.get() == get_tail())
		
			return std::unique_ptr<node>();
		
		value = std::move(*head->data);
		return pop_head();
	

public:
	threadsafe_queue() :
		head(new node), tail(head.get())
	
	threadsafe_queue(const threadsafe_queue& other) = delete;
	threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

	std::shared_ptr<T> wait_and_pop()
	
		std::unique_ptr<node> const old_head = wait_pop_head();
		return old_head->data;
	

	void wait_and_pop(T& value)
	
		std::unique_ptr<node> const old_head = wait_pop_head(value);
	

	std::shared_ptr<T> try_pop()
	
		std::unique_ptr<node> const old_head = try_pop_head();
		return old_head ? old_head->data : std::shared_ptr<T>();
	

	bool try_pop(T& value)
	
		std::unique_ptr<node> const old_head = try_pop_head(value);
		return old_head;
	

	bool empty()
	
		std::lock_guard<std::mutex> head_lock(head_mutex);
		return (head.get() == get_tail());
	
	void push(T new_value)
	
		std::shared_ptr<T> new_data(
			std::make_shared<T>(std::move(new_value)));
		std::unique_ptr<node> p(new node);
		
			std::lock_guard<std::mutex> tail_lock(tail_mutex);
			tail->data = new_data;
			node* const new_tail = p.get();
			tail->next = std::move(p);
			tail = new_tail;
		
		data_cond.notify_one();
	
;

无锁队列

通过vs2017和vs2019编译(2019.9.11)
采用的内外两个计数维护内存。

#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1
#include <iostream>
#include <memory>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
template<typename T>
class lock_free_queue 
private:
	struct node;
	struct counted_node_ptr 
		std::uint64_t external_count = 0;
		node* ptr = nullptr;
	;


	std::atomic<counted_node_ptr> head;
	std::atomic<counted_node_ptr> tail;
	struct node_counter 
		std::uint32_t internal_count : 30;
		std::uint32_t external_counters : 2;
	;

	struct node 
		std::atomic<T*> data;
		std::atomic<node_counter> count;
		std::atomic<counted_node_ptr> next;

		node():data(nullptr) 
			node_counter new_count;
			new_count.internal_count = 0;
			new_count.external_counters = 2;
			count.store(new_count);
			counted_node_ptr new_ptr;
			new_ptr.ptr = nullptr;
			new_ptr.external_count = 0;
			next.store(new_ptr);
		
		void release_ref() 
			node_counter old_counter =
				count.load(std::memory_order_relaxed);
			node_counter new_counter;
			do
			
				new_counter = old_counter;
				--new_counter.internal_count;
			 while (!count.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));
			if (!new_counter.internal_count && !new_counter.external_counters) 
				delete this;
			
		
	;

	void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr const& new_tail) 
		node* const current_tail_ptr = old_tail.ptr;
		while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr);
		if (old_tail.ptr == current_tail_ptr) 
			free_external_counter(old_tail);
		
		else 
			current_tail_ptr->release_ref();
		
	
	static void increase_external_count(
		std::atomic<counted_node_ptr>& counter,
		counted_node_ptr& old_counter)
	
		counted_node_ptr new_counter;
		do
		
			new_counter = old_counter;
			++new_counter.external_count;
		 while (!counter.compare_exchange_strong(
			old_counter, new_counter,
			std::memory_order_acquire, std::memory_order_relaxed));
		old_counter.external_count = new_counter.external_count;
	

	static void free_external_counter(counted_node_ptr& old_node_ptr) 
		node* const ptr = old_node_ptr.ptr;
		int const count_increase = old_node_ptr.external_count - 2;
		node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
		node_counter new_counter;
		do 
			new_counter = old_counter;
			--new_counter.external_counters;
			new_counter.internal_count += count_increase;
		 while (!ptr->count.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));
		if (!new_counter.internal_count && !new_counter.external_counters) 
			delete ptr;
		
	

public:
	lock_free_queue() 
		counted_node_ptr newHead;
		newHead.ptr = new node;
		head.store(newHead);
		tail = head.load();
	
	lock_free_queue(const lock_free_queue&) = delete;
	lock_free_queue& operator=(const lock_free_queue&) = delete;
	~lock_free_queue() 
		counted_node_ptr old_head = head.load();
		while (old_head.ptr) 
			old_head.ptr->next.load();
			head.store(old_head.ptr->next.load());
			delete old_head.ptr;
			old_head = head.load();
		
	
	void push(T new_value) 
		std::unique_ptr<T> new_data(new T(new_value));
		counted_node_ptr new_next;
		new_next.ptr = new node;
		new_next.external_count = 1;
		counted_node_ptr old_tail = tail.load();
		for (;;) 
			increase_external_count(tail, old_tail);
			T* old_data = nullptr;
			if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) 
				counted_node_ptr old_next ;
				if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) 
					delete new_next.ptr;
					new_next = old_next;
				
				set_new_tail(old_tail, new_next);
				new_data.release();
				break;
			
			else 
				counted_node_ptr old_next;
				if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) 
					old_next = new_next;
					new_next.ptr = new node;
				set_new_tail(old_tail, old_next);
			
		
	

	std::unique_ptr<T> pop() 
		counted_node_ptr old_head = head.load(std::memory_order_relaxed);
		for (;;) 
			increase_external_count(head, old_head);
			node* const ptr = old_head.ptr;
			if (ptr == tail.load().ptr) 
				return std::unique_ptr<T>();
			
			counted_node_ptr next = ptr->next.load();
			if (head.compare_exchange_strong(old_head, next)) 
				T* const res = ptr->data.exchange(nullptr);
				free_external_counter(old_head);
				return std::unique_ptr<T>(res);
			
			ptr->release_ref();
		
	
;
//测试程序
void consumer(lock_free_queue<int>& q, int size) 
	for (int i = 0; i < size; ++i) 
		std::shared_ptr<int> value = q.pop();
		std::cout << "Consumer fetched" << value << std::endl;
	


void producer(lock_free_queue<int>& q, int size) 
	for (int i = 0; i < size; ++i) 
		q.push(i);
		std::cout << "Produced produced" << i << std::endl;
	


int main(int argc, const char** argv) 
	static const int BUFFER_SIZE = 10;
	lock_free_queue<int> q;
	auto start = std::chrono::system_clock::now();
	std::thread c1(consumer, std::ref(q), BUFFER_SIZE);
	std::thread p1(producer, std::ref(q), BUFFER_SIZE);
	c1.join();
	p1.join();
	auto end = std::chrono::system_clock::now();
	auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
	std::cout << elapsed.count() << '\\n';
	return 0;

以上是关于C++11并发,有锁队列和无锁队列的主要内容,如果未能解决你的问题,请参考以下文章

锁CAS操作和无锁队列的实现

c++11 并发队列的生产方案 BlockingConcurrentQueue

如何在无锁并发队列中实现“Front”方法?

线程安全集合

c++11 2线程无锁队列

C ++ 11中无锁的多生产者/消费者队列