一读多写非自旋无锁链表队列实现

Posted E-HERO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一读多写非自旋无锁链表队列实现相关的知识,希望对你有一定的参考价值。

现在一说到多线程操作的队列,基本上都会标榜自己是无锁的,这样的无锁基本上都有两大特点:

1. 换了一种锁

利用原子变量自旋,本质还是自旋等锁,其实也是一种锁,只是用了更底层一点。这种无锁,其实也是消耗CPU

2. 数组结构

这样的队列大多都是用数组,数组的好处就是寻址方便,移动的时候只需要原子加1,但不好的地方就是空间分配不够灵活。我要是分配小了,在流量高峰可能就满了,但内存可能还可以足够,我要是分配大了,会一直占住这么多内存,内存分配不够灵活

 

本文讨论的场景是一种常见的场景,一读多写场景。例如:打日志的时候,本质就是往一个队列里面塞日志,另外会有一个线程读信息,然后写入磁盘;再例如,PRC里面,对一个fd要写数据,由于fd是不能被多线程共享同时写,这时候必须互斥,常见的做法就是多个线程写消息到一个队列,由一个线程来从队列读消息,写进fd,这样的好处是写fd的线程不用频繁切换,对于多路复用下连接更常见。

 

这里讨论的就是在这种一读多写的场景下,用非自旋,链表结构实现一读多写的队列。可以有多个写线程来同时写数据,这里的写是非阻塞的,只能有一个线程读,如果有多个线程同时读,只会有一个线程读到数据,同样,读也是非阻塞的,如果数据是空就返回读不到数据。

 

队列实现

这里的队列实现思想,主要参考了BRPC里面发送消息那部分的思想,是那种思想的一种改造

队列有一个队列头A,写B的时候与这个头做原子交换(不是cas,就是exchange),然后队头就变成了B,再把B的next指向A,如果我要写入A,B,C,过程如下:

写A:NULL<-A

写B:NULL<-A<-B

写C:NULL<-A<-B<-C

这里多线程写,不会阻塞,也不会自旋,并且保证数据都能写入

 

读这里就略微复杂,有些细节这里就不展开,看代码吧。读总的来说就是从当前的队列头,开始链表逆序,入上面举的例子,逆序完之后就变成了

A->B->C->NULL

如果逆序过程中有新数据D写入,就会这样

A->B->C<-D<-E (C的next还是NULL,只是C变成了B和D的next)

逆序后,我的cur指针指向之前的逆序前的队尾,逆序后左边链表的队头,也就是A,然后读就从A开始读,一直把C读完,这时候cur又指向NULL了

如果这整个逆序,包括读完C这个过程,没有新数据写入,这时候队列里面就没有新数据了 ,如果有新数据来,结构如下

C<-D<-E

这时候再逆序,由于每次逆序都会把这次逆序前的头给存下来(第一次逆序前的头就是C),所以这一次逆序会剔除掉上一次逆序前的头,第二次逆序完之后结构就变成了

D->E 

如果有新增数据

D->E<-F

就和上面第一次一样的了

 

性能

由于找不到一台线上机来跑,就在测试机上跑了一下(不是空跑,上面也有一堆服务,是共用的,我也是找不到一台基本空的线上机来跑,只好这样粗略来估了,都是穷啊)

48核机器上,开了48线程写,一线程读

写大约 650W+ 每秒  (由于不是空机器,跑了几次都有一定波动,最快一次写是800+W)  

读大约 320W+ 每秒

 

 

实现过程中遇到的一些问题

ABA问题

用链表来实现这种多线程队列,最常见的问题就算ABA问题(https://en.wikipedia.org/wiki/ABA_problem),ABA简单来说就是在多线程里面当你在不同时刻看到同一个值,这个值是否真的代表一次操作的结果还是多次不同操作的结果。例如内存地址,如果我用内存指针指向的地址判断这两个item是不是同一个东西,大概率是会出错的。因为内存地址会被回收重新使用,思考下面一顿操作

1. 写入一个值为2的数据,内存地址0x1234567

2.读取一个数据(步骤1中写入的2),并且释放这个item的内存空间

3.写入一个值为2的数据,内存地址0x1234567(大概率会重复)

这时候如果按照地址和值来对比,步骤1和步骤3写入的就是同一个东西,但实际上这应该是两个不同的东西,这就是ABA问题,在需要对比两个item的时候就可能引发混乱

解决ABA问题最简单的方法就是利用加上一个版本号,例如在BRPC里面封装的ResoucePool,为了防止ABA问题,在一个int64上低32位有一个版本号,比对的时候通过值和版本号对比。如果版本号重复,其实也会出问题,但是只要版本号是递增的,要重复也就需要在短时间内有2^32次方个操作,并且值一样,才能让两个item看起来一样,但实际场景中,很少会有这样频繁的操作(如果有,就改用64位的版本号 = = ),但总的来说,利用版本号也不是绝对避免ABA问题,还是分场景。

回到前面提到的ABA问题,整个过程中有个地方是要判断当前的队头,是否等于逆序前的队头(C),如果等于,就说明没有新数据写入,如果不等于就说明有新数据写入了,但是这个等于就很玄学了,如果你要用地址等于,读完逆序前的对头(C),把C delete之后,这时候刚好有新数据写入,很有可能新数据的地址就是原来C的地址,这个判断下来其实是有新数据写入了,但实际是上由于地址一样,是会当做没有新数据写入,所以最早是在这里加了版本号来解决这个问题

 

懒释放

虽然加上了版本号,判断是否有新数据写入这个ABA问题是解决了,但是又碰到个double free,当时用GDB跟了好久,也自己在纸上画了好久,才分析出来,逆序后,读完C   delete后,C的地址就释放了,再次逆序的时候,由于D的next还指向C,第二次逆序的时候,就会把C也当做合法元素算进去逆序,这时候问题就在于,C已经delete了,他的地址很可能被复用了,里面已经是其他合法元素了(还是ABA问题),并且刚刚写进队列,还不在本次逆序范围内(也就是逆序开始后才被写入队列),这时候就可能出翔。这时候也不好通过版本号来看,两个问题:

1. C的地址已经释放了,没有复用,去判断C地址的版本号就是访问非法内存,就算读到了数据,读完一释放,就double free了

2. C的地址已经释放了,已经复用,按说版本号递增,后面的版本号大于前面的版本号,判断是否小于,但是,这个假设有问题,这种底层队列调用量可能很大,版本号可能会溢出,这个也不科学

所以最后想到的就是懒释放,我读完一个元素之后先不是释放,留着,这样保证他的地址不会被复用,后面如果是要逆序也可以用这个地址来判断,保证唯一性。只需要在读完这个元素后释放上一个元素,或者在逆序读完数据后没有新数据写入时释放,这样也可以从另外一个层面避免ABA问题

 

以上就是我去年(2018年新年前)探索的一个非自旋,非阻塞,无锁一读多写链表队列

PS:去年写的时候就卡在double free,当时怀疑是用了boost的内存池引起的,刚好也是新年前一天,没时间了,就耽搁到现在。这次也是在新年前把这个问题给搞定了,这个中间也是经历不少事情,来回奔波,回头写这段代码,真是很有感触

记于  2019年2月2日(新年假期最后一个工作日)

 

附上代码

#ifndef ZAC_WMQUEUE_H
#define ZAC_WMQUEUE_H

#include <atomic>
#include <cstdlib>

template <typename T>
class MWSRQueue 
public:
    enum STATUS 
        READING = 1,
        WAITING = 0,
    ;

    MWSRQueue() 
        _head = NULL;
        _cur = NULL;
        _last = -1;
        _reading = 0;
        _version = 0;
    

    virtual int MWrite(T t) 
        Item* it = new Item;
        it->next = NULL;
        it->t = t;
        it->version = _version.fetch_add(1, std::memory_order_acq_rel);

        Item* pre_head = _head.exchange(it);
        it->next = pre_head;

        if (pre_head == NULL) 
            // It means no read
            return 1;
        
        return 0;
    

    virtual int SRead(T& t) 
        int status = WAITING;
        if (!_reading.compare_exchange_strong(status, READING))
            return -1;
        if (_cur == NULL) 
            if (_head == NULL) 
                _reading.store(WAITING);
                return 1; // empty
             else 
                // no new item add
                if (_head.load(std::memory_order_acquire)->version == _last) 
                    _reading.store(WAITING);
                    return 1; // empty
                

                Item* head = _head.load(std::memory_order_acquire);
                Item* it1 = head, *it2 = head->next;

                head->next = NULL;
                while (it2 != NULL && it2->version != _last  && it2 != _tail) 
                    Item* tmp = it2->next;
                    it2->next = it1;
                    it1 = it2;
                    it2 = tmp;
                
                _cur = it1;
                _last = head->version;
            
        

        t = _cur->t;
        Item* del = _cur;
        if (_head.compare_exchange_strong(del, NULL, std::memory_order_acq_rel)) 
            delete del;
            _tail = NULL;
         else 
            if (_tail != NULL) 
                delete _tail;
             
            _tail = _cur;
        
        _cur = _cur->next;
        _reading.store(WAITING);
        return 0;
    

private:
    struct Item 
        T t;
        Item* next;
        int version;
        bool operator ==(const Item & it) 
            return &it == this && version = it.version;
        

        bool operator != (const Item & it) 
            return &it != this || version != it.version;
        
    ;

    std::atomic<Item*> _head;
    int _last;
    Item* _cur;
    Item* _tail;
    std::atomic_int _reading;
    std::atomic_int _version;
;


#endif //ZAC_WMQUEUE_H

 

因为前人,所以更高

1. BRPC 发消息:https://github.com/brpc/brpc/blob/master/docs/cn/io.md#%E5%8F%91%E6%B6%88%E6%81%AF

2. BRPC 防止ABA问题:https://github.com/brpc/brpc/blob/master/docs/cn/memory_management.md#%E7%94%9F%E6%88%90bthread_t

3. ABA问题: https://en.wikipedia.org/wiki/ABA_problem

以上是关于一读多写非自旋无锁链表队列实现的主要内容,如果未能解决你的问题,请参考以下文章

三态门实现“一读多写”总线结构

无锁链表的性能比锁定链表差

常遇到读多写少,教你用ReadWriteLock实现一个通用的缓存中心

无锁链表

偏向锁,偏向线程id ,自旋锁

如何用ReadWriteLock实现一个通用的缓存中心?