C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)
Posted 小丑快学习
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)相关的知识,希望对你有一定的参考价值。
采用计数器的方式防止条件竞争。
template<typename T>
class lock_free_queue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;//记录外部线程通过指针的引用
node* ptr;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail; // 1
//计数器
struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2; // 大小不会超过2,同时最多由next,和head或者tail指向。
};
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count; // 3
counted_node_ptr next;
node()
{
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2; // next 和 tail 同时指向,因为新创建的节点将会放入到队列中
count.store(new_count);
next.ptr = nullptr;
next.external_count = 0;
}
/*
释放对当前节点的引用,释放节点引用时语言将该节点得到内部计数减一,
如果减一后节点的外部引用以及内部引用计数为零,则删除该节点。
释放操作由节点自己完成,所以该函数为node的成员函数。
所以该函数释放的是该函数的计数器的外部引用节点。
*/
void release_ref()
{
node_counter old_counter = count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.internal_count; // 内部计数减一
} while (!count.compare_exchange_strong( // 2
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete this; // 3
}
}//release_ref
};//node
/*
增加对对节点的外部引用计数,
同时增加两个计数指针的引用,
增加的是外部节点的引用,而非节点的引用计数器。
*/
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
} while (!counter.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}//increase_external_count
/*
释放一个节点的引用,同时将外部节点的引用计数加入到内部节点中,
当节点的引用计数为零时将节点删除。计数器中的外部引用也会减一
*/
static void free_external_counter(counted_node_ptr
& old_node_ptr)
{
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter =
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters; // 1 计数器外部节点引用减一,此时tail和next已经没有指向该节点
new_counter.internal_count += count_increase; // 2 将外部节点引用的计数加到内部节点引用
} while (!ptr->count.compare_exchange_strong( // 3
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete ptr; // 如果引用为零,便删除节点
}
}//free_external_counter
public:
/*
数据进对队列成功后会将计数器的外部计数减一,
因为此时只有一个next指向节点
*/
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;)
{
increase_external_count(tail, old_tail); // 增加外部节点的引用(非计数器)
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong( // 6
old_data, new_data.get()))
{
old_tail.ptr->next = new_next;
old_tail = tail.exchange(new_next);
free_external_counter(old_tail); // 此时内部计数的外部计数将减一
new_data.release();
break;
}
old_tail.ptr->release_ref();//指针离开节点则内部计数减一
}
}//push
std::unique_ptr<T> pop()
{
counted_node_ptr
old_head = head.load(std::memory_order_relaxed); // 1
for (;;)
{
increase_external_count(head, old_head); // 增加节点引用(非计数器引用)
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr)//条件满足则队列为空
{
ptr->release_ref(); // 3
return std::unique_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next)) // 条件满足则取出数据
{
T* const res = ptr->data.exchange(nullptr);//取出数据
free_external_counter(old_head); //释放计数器节点
return std::unique_ptr<T>(res);
}
ptr->release_ref(); // 指针离开节点,则内部数据减一
}
}//pop
};
在push操作中,代码 6 ,if (old_tail.ptr->data.compare_exchange_strong( old_data, new_data.get()))
条件失败就会导致一直循环,从而导致cpu一直被占用,因此,这种情况实际上类似于自旋锁,这将使得效率非常的低,因此,我们可以想办法让忙等待的线程帮助其他线程完成操作,因而减少忙等带来的浪费。
template<typename T>
class lock_free_queue
{
private:
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next; // 1
};
public:
std::unique_ptr<T> pop()
{
counted_node_ptr
old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr)
{
return std::unique_ptr<T>();
}
counted_node_ptr next=ptr->next.load(); // 2
if(head.compare_exchange_strong(old_head,next))
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
/*
更新尾节点
*/
void set_new_tail(counted_node_ptr& old_tail, // 1
counted_node_ptr const& new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);//如果没有线程修改尾节点或者pop尾节点,那么修改尾节点
if (old_tail.ptr == current_tail_ptr) // 3
free_external_counter(old_tail); // 4
else
current_tail_ptr->release_ref(); // 5
}
public:
//提高性能的push
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;)
{
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong( // 6 将数据放入到原来的节点中
old_data, new_data.get()))
{
counted_node_ptr old_next = { 0 };//old_next 应该为空
if (!old_tail.ptr->next.compare_exchange_strong( // 7
old_next, new_next))//如果原来的next不空,则说明已经有其他线程已经修改尾指针
{
delete new_next.ptr; // 8 此时数据已经push成功,因此释放new出的节点
new_next = old_next; // 9 new_next也应该置空
}
set_new_tail(old_tail, new_next);//修改tail指针
new_data.release();
break;
}
else // 10 说明其他线程已经修改数据
{
counted_node_ptr old_next = { 0 };
if (old_tail.ptr->next.compare_exchange_strong( // 如果还未修改尾指针,则由该线程修改
old_next, new_next))//此处是复制操作,非指针操作,但是ptr指针为浅拷贝
{
old_next = new_next; // 12 old_next 指向的node和old_tail指向的是同一个
new_next.ptr = new node; // 13 因为该线程new的节点已经称为了tail,因此,需要重新new
}
set_new_tail(old_tail, old_next); // 更新节点指针,tail指向下一个节点
}
}
}
};
以上是关于C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)的主要内容,如果未能解决你的问题,请参考以下文章
C++并发编程----实现线无锁线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)
C++并发编程----利用锁实现线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)
c++11 并发队列的生产方案 BlockingConcurrentQueue