一读多写非自旋无锁链表队列实现
Posted E-HERO
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一读多写非自旋无锁链表队列实现相关的知识,希望对你有一定的参考价值。
现在一说到多线程操作的队列,基本上都会标榜自己是无锁的,这样的无锁基本上都有两大特点:
1. 换了一种锁
利用原子变量自旋,本质还是自旋等锁,其实也是一种锁,只是用了更底层一点。这种无锁,其实也是消耗CPU
2. 数组结构
这样的队列大多都是用数组,数组的好处就是寻址方便,移动的时候只需要原子加1,但不好的地方就是空间分配不够灵活。我要是分配小了,在流量高峰可能就满了,但内存可能还可以足够,我要是分配大了,会一直占住这么多内存,内存分配不够灵活
本文讨论的场景是一种常见的场景,一读多写场景。例如:打日志的时候,本质就是往一个队列里面塞日志,另外会有一个线程读信息,然后写入磁盘;再例如,PRC里面,对一个fd要写数据,由于fd是不能被多线程共享同时写,这时候必须互斥,常见的做法就是多个线程写消息到一个队列,由一个线程来从队列读消息,写进fd,这样的好处是写fd的线程不用频繁切换,对于多路复用下连接更常见。
这里讨论的就是在这种一读多写的场景下,用非自旋,链表结构实现一读多写的队列。可以有多个写线程来同时写数据,这里的写是非阻塞的,只能有一个线程读,如果有多个线程同时读,只会有一个线程读到数据,同样,读也是非阻塞的,如果数据是空就返回读不到数据。
队列实现
这里的队列实现思想,主要参考了BRPC里面发送消息那部分的思想,是那种思想的一种改造
写
队列有一个队列头A,写B的时候与这个头做原子交换(不是cas,就是exchange),然后队头就变成了B,再把B的next指向A,如果我要写入A,B,C,过程如下:
写A:NULL<-A
写B:NULL<-A<-B
写C:NULL<-A<-B<-C
这里多线程写,不会阻塞,也不会自旋,并且保证数据都能写入
读
读这里就略微复杂,有些细节这里就不展开,看代码吧。读总的来说就是从当前的队列头,开始链表逆序,入上面举的例子,逆序完之后就变成了
A->B->C->NULL
如果逆序过程中有新数据D写入,就会这样
A->B->C<-D<-E (C的next还是NULL,只是C变成了B和D的next)
逆序后,我的cur指针指向之前的逆序前的队尾,逆序后左边链表的队头,也就是A,然后读就从A开始读,一直把C读完,这时候cur又指向NULL了
如果这整个逆序,包括读完C这个过程,没有新数据写入,这时候队列里面就没有新数据了 ,如果有新数据来,结构如下
C<-D<-E
这时候再逆序,由于每次逆序都会把这次逆序前的头给存下来(第一次逆序前的头就是C),所以这一次逆序会剔除掉上一次逆序前的头,第二次逆序完之后结构就变成了
D->E
如果有新增数据
D->E<-F
就和上面第一次一样的了
性能
由于找不到一台线上机来跑,就在测试机上跑了一下(不是空跑,上面也有一堆服务,是共用的,我也是找不到一台基本空的线上机来跑,只好这样粗略来估了,都是穷啊)
48核机器上,开了48线程写,一线程读
写大约 650W+ 每秒 (由于不是空机器,跑了几次都有一定波动,最快一次写是800+W)
读大约 320W+ 每秒
实现过程中遇到的一些问题
ABA问题
用链表来实现这种多线程队列,最常见的问题就算ABA问题(https://en.wikipedia.org/wiki/ABA_problem),ABA简单来说就是在多线程里面当你在不同时刻看到同一个值,这个值是否真的代表一次操作的结果还是多次不同操作的结果。例如内存地址,如果我用内存指针指向的地址判断这两个item是不是同一个东西,大概率是会出错的。因为内存地址会被回收重新使用,思考下面一顿操作
1. 写入一个值为2的数据,内存地址0x1234567
2.读取一个数据(步骤1中写入的2),并且释放这个item的内存空间
3.写入一个值为2的数据,内存地址0x1234567(大概率会重复)
这时候如果按照地址和值来对比,步骤1和步骤3写入的就是同一个东西,但实际上这应该是两个不同的东西,这就是ABA问题,在需要对比两个item的时候就可能引发混乱
解决ABA问题最简单的方法就是利用加上一个版本号,例如在BRPC里面封装的ResoucePool,为了防止ABA问题,在一个int64上低32位有一个版本号,比对的时候通过值和版本号对比。如果版本号重复,其实也会出问题,但是只要版本号是递增的,要重复也就需要在短时间内有2^32次方个操作,并且值一样,才能让两个item看起来一样,但实际场景中,很少会有这样频繁的操作(如果有,就改用64位的版本号 = = ),但总的来说,利用版本号也不是绝对避免ABA问题,还是分场景。
回到前面提到的ABA问题,整个过程中有个地方是要判断当前的队头,是否等于逆序前的队头(C),如果等于,就说明没有新数据写入,如果不等于就说明有新数据写入了,但是这个等于就很玄学了,如果你要用地址等于,读完逆序前的对头(C),把C delete之后,这时候刚好有新数据写入,很有可能新数据的地址就是原来C的地址,这个判断下来其实是有新数据写入了,但实际是上由于地址一样,是会当做没有新数据写入,所以最早是在这里加了版本号来解决这个问题
懒释放
虽然加上了版本号,判断是否有新数据写入这个ABA问题是解决了,但是又碰到个double free,当时用GDB跟了好久,也自己在纸上画了好久,才分析出来,逆序后,读完C delete后,C的地址就释放了,再次逆序的时候,由于D的next还指向C,第二次逆序的时候,就会把C也当做合法元素算进去逆序,这时候问题就在于,C已经delete了,他的地址很可能被复用了,里面已经是其他合法元素了(还是ABA问题),并且刚刚写进队列,还不在本次逆序范围内(也就是逆序开始后才被写入队列),这时候就可能出翔。这时候也不好通过版本号来看,两个问题:
1. C的地址已经释放了,没有复用,去判断C地址的版本号就是访问非法内存,就算读到了数据,读完一释放,就double free了
2. C的地址已经释放了,已经复用,按说版本号递增,后面的版本号大于前面的版本号,判断是否小于,但是,这个假设有问题,这种底层队列调用量可能很大,版本号可能会溢出,这个也不科学
所以最后想到的就是懒释放,我读完一个元素之后先不是释放,留着,这样保证他的地址不会被复用,后面如果是要逆序也可以用这个地址来判断,保证唯一性。只需要在读完这个元素后释放上一个元素,或者在逆序读完数据后没有新数据写入时释放,这样也可以从另外一个层面避免ABA问题
以上就是我去年(2018年新年前)探索的一个非自旋,非阻塞,无锁一读多写链表队列
PS:去年写的时候就卡在double free,当时怀疑是用了boost的内存池引起的,刚好也是新年前一天,没时间了,就耽搁到现在。这次也是在新年前把这个问题给搞定了,这个中间也是经历不少事情,来回奔波,回头写这段代码,真是很有感触
记于 2019年2月2日(新年假期最后一个工作日)
附上代码
#ifndef ZAC_WMQUEUE_H
#define ZAC_WMQUEUE_H
#include <atomic>
#include <cstdlib>
template <typename T>
class MWSRQueue
public:
enum STATUS
READING = 1,
WAITING = 0,
;
MWSRQueue()
_head = NULL;
_cur = NULL;
_last = -1;
_reading = 0;
_version = 0;
virtual int MWrite(T t)
Item* it = new Item;
it->next = NULL;
it->t = t;
it->version = _version.fetch_add(1, std::memory_order_acq_rel);
Item* pre_head = _head.exchange(it);
it->next = pre_head;
if (pre_head == NULL)
// It means no read
return 1;
return 0;
virtual int SRead(T& t)
int status = WAITING;
if (!_reading.compare_exchange_strong(status, READING))
return -1;
if (_cur == NULL)
if (_head == NULL)
_reading.store(WAITING);
return 1; // empty
else
// no new item add
if (_head.load(std::memory_order_acquire)->version == _last)
_reading.store(WAITING);
return 1; // empty
Item* head = _head.load(std::memory_order_acquire);
Item* it1 = head, *it2 = head->next;
head->next = NULL;
while (it2 != NULL && it2->version != _last && it2 != _tail)
Item* tmp = it2->next;
it2->next = it1;
it1 = it2;
it2 = tmp;
_cur = it1;
_last = head->version;
t = _cur->t;
Item* del = _cur;
if (_head.compare_exchange_strong(del, NULL, std::memory_order_acq_rel))
delete del;
_tail = NULL;
else
if (_tail != NULL)
delete _tail;
_tail = _cur;
_cur = _cur->next;
_reading.store(WAITING);
return 0;
private:
struct Item
T t;
Item* next;
int version;
bool operator ==(const Item & it)
return &it == this && version = it.version;
bool operator != (const Item & it)
return &it != this || version != it.version;
;
std::atomic<Item*> _head;
int _last;
Item* _cur;
Item* _tail;
std::atomic_int _reading;
std::atomic_int _version;
;
#endif //ZAC_WMQUEUE_H
因为前人,所以更高
1. BRPC 发消息:https://github.com/brpc/brpc/blob/master/docs/cn/io.md#%E5%8F%91%E6%B6%88%E6%81%AF
2. BRPC 防止ABA问题:https://github.com/brpc/brpc/blob/master/docs/cn/memory_management.md#%E7%94%9F%E6%88%90bthread_t
3. ABA问题: https://en.wikipedia.org/wiki/ABA_problem
以上是关于一读多写非自旋无锁链表队列实现的主要内容,如果未能解决你的问题,请参考以下文章