C++11并发,有锁队列和无锁队列
Posted 帝王铠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++11并发,有锁队列和无锁队列相关的知识,希望对你有一定的参考价值。
有锁队列
#include <memory>
#include <mutex>
template<typename T>
class threadsafe_queue
private:
struct node
std::shared_ptr<T> data;
std::unique_ptr<node> next;
;
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
node* get_tail()
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
std::unique_ptr<node> pop_head()
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
std::unique_lock<std::mutex> wait_for_data()
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock, [&] return head.get() != get_tail(); );
return std::move(head_lock);
std::unique_ptr<node> wait_pop_head()
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
std::unique_ptr<node> wait_pop_head(T& value)
std::unique_lock<std::mutex> head_lock(wait_for_data());
value = std::move(*head->data);
return pop_head();
std::unique_ptr<node> try_pop_head()
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
return std::unique_ptr<node>();
return pop_head();
std::unique_ptr<node> try_pop_head(T& value)
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
return std::unique_ptr<node>();
value = std::move(*head->data);
return pop_head();
public:
threadsafe_queue() :
head(new node), tail(head.get())
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(const threadsafe_queue& other) = delete;
std::shared_ptr<T> wait_and_pop()
std::unique_ptr<node> const old_head = wait_pop_head();
return old_head->data;
void wait_and_pop(T& value)
std::unique_ptr<node> const old_head = wait_pop_head(value);
std::shared_ptr<T> try_pop()
std::unique_ptr<node> const old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
bool try_pop(T& value)
std::unique_ptr<node> const old_head = try_pop_head(value);
return old_head;
bool empty()
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get() == get_tail());
void push(T new_value)
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
data_cond.notify_one();
;
无锁队列
通过vs2017和vs2019编译(2019.9.11)
采用的内外两个计数维护内存。
#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1
#include <iostream>
#include <memory>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
template<typename T>
class lock_free_queue
private:
struct node;
struct counted_node_ptr
std::uint64_t external_count = 0;
node* ptr = nullptr;
;
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
struct node_counter
std::uint32_t internal_count : 30;
std::uint32_t external_counters : 2;
;
struct node
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
node():data(nullptr)
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr new_ptr;
new_ptr.ptr = nullptr;
new_ptr.external_count = 0;
next.store(new_ptr);
void release_ref()
node_counter old_counter =
count.load(std::memory_order_relaxed);
node_counter new_counter;
do
new_counter = old_counter;
--new_counter.internal_count;
while (!count.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters)
delete this;
;
void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr const& new_tail)
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
counted_node_ptr new_counter;
do
new_counter = old_counter;
++new_counter.external_count;
while (!counter.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
static void free_external_counter(counted_node_ptr& old_node_ptr)
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
while (!ptr->count.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters)
delete ptr;
public:
lock_free_queue()
counted_node_ptr newHead;
newHead.ptr = new node;
head.store(newHead);
tail = head.load();
lock_free_queue(const lock_free_queue&) = delete;
lock_free_queue& operator=(const lock_free_queue&) = delete;
~lock_free_queue()
counted_node_ptr old_head = head.load();
while (old_head.ptr)
old_head.ptr->next.load();
head.store(old_head.ptr->next.load());
delete old_head.ptr;
old_head = head.load();
void push(T new_value)
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;)
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
counted_node_ptr old_next ;
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
delete new_next.ptr;
new_next = old_next;
set_new_tail(old_tail, new_next);
new_data.release();
break;
else
counted_node_ptr old_next;
if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
old_next = new_next;
new_next.ptr = new node;
set_new_tail(old_tail, old_next);
std::unique_ptr<T> pop()
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;)
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr)
return std::unique_ptr<T>();
counted_node_ptr next = ptr->next.load();
if (head.compare_exchange_strong(old_head, next))
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
ptr->release_ref();
;
//测试程序
void consumer(lock_free_queue<int>& q, int size)
for (int i = 0; i < size; ++i)
std::shared_ptr<int> value = q.pop();
std::cout << "Consumer fetched" << value << std::endl;
void producer(lock_free_queue<int>& q, int size)
for (int i = 0; i < size; ++i)
q.push(i);
std::cout << "Produced produced" << i << std::endl;
int main(int argc, const char** argv)
static const int BUFFER_SIZE = 10;
lock_free_queue<int> q;
auto start = std::chrono::system_clock::now();
std::thread c1(consumer, std::ref(q), BUFFER_SIZE);
std::thread p1(producer, std::ref(q), BUFFER_SIZE);
c1.join();
p1.join();
auto end = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
std::cout << elapsed.count() << '\\n';
return 0;
以上是关于C++11并发,有锁队列和无锁队列的主要内容,如果未能解决你的问题,请参考以下文章