CAS无锁队列的实现
Posted 清水寺扫地僧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CAS无锁队列的实现相关的知识,希望对你有一定的参考价值。
1. 基本原理
源于1994年10月发表在国际并行与分布式会议上的论文【无锁队列的实现.pdf】。CAS(Compare And Swap,CAS维基百科)指令。CAS的实现可参考下面的代码:
bool CAS(int* pAddr, int nExpected, int nNew) atomically {
if(*pAddr == nExpected) {
*pAddr = nNew;
return true;
} else return false;
}
//CAS返回bool告知原子性交换是否成功
CAS 根据字面意思即可理解,就是对数据进行交换的一种原子操作。对应到 CPU 指令的话就是cmpchg
。
无锁队列的内部实现实际上也是原子操作,可以避免多线程调用出现的不可预知的情况,也即进行线程的并发同步。主要的核心就是函数__sync_bool_compare_and_swap
,返回类型是 bool 型,原子交换成功返回 true,失败返回 false。
2. 代码实现
2.1 使用链表实现无锁队列
入队操作:
Enqueue(EleType x) {
Node* q = new Node();
q->x = x;
q->next = nullptr;
do {
Node* p = tail;
} while(!CAS(p->next, nullptr, q));
CAS(tail, p, q);
}
这样实现的话会导致若是某个线程在执行入队操作时,在CAS(tail, p, q)
执行之前,也即将尾节点替换为新加入的入队节点前挂掉了,那么将导致其它的线程在执行入队操作CAS(p->next, nullptr, q)
时一直返回 false,即无限循环于此,因为 p->next 已经更改为 q,但是tail并没有更新为 q 仍然是 p 。
基于此,可以对以上的入队操作进行改进,在do while();当中找到无锁队列的实际尾节点,即p->next==nullptr
,再进行 tail 节点的替换,实现代码如下:
Enqueue(EleType x) {
Node* q= new Node();
q->x = x;
q->next = nullptr;
Node* p = tail;
Node* oldP = tail;
do {
while(p->next != nullptr)
p = p->next;
} while(!CAS(p->next, nullptr, q));
CAS(tail, oldP, q);
}
这样的话即使出现上文中讲述的更新 tail 前线程挂掉的情况,在进入到 do while(); 循环时,p 将会指定为无锁队列的实际尾部节点,从而CAS(p->next, nullptr, q)
返回 true,结束循环,更新 tail 节点值。
出队操作:
Dequeue() {
do {
Node* p = head->next;
if(p == nullptr)
return EMPTY;
} while(CAS(head->next, p, p->next));
return p->val;
}
//这里的head是一个哑头节点
模板无锁队列类的实现:
有了如上的思想,那么就可以实现一个模板无锁队列类,代码如下:
// 定义一个链表实现队列
template <typename ElemType>
struct qnode // 链表节点
{
struct qnode *_next;
ElemType _data;
};
template <typename ElemType>
class Queue
{
private:
struct qnode<ElemType> *volatile _head = NULL; // 随着pop后指向的位置是不一样的, head不是固定的
struct qnode<ElemType> *volatile _tail = NULL;
public:
Queue() {
_head = _tail = new qnode<ElemType>;
_head->_next = NULL;
_tail->_next = NULL;
printf("Queue _head:%p\\n", _head);
}
void push_list(const ElemType& e) {
struct qnode<ElemType>* p = new qnode<ElemType>;
if (!p) return ; //p生成失败
p->next = NULL;
p->data = e;
struct qnode<ElemType>* t = _tail;
struct qnode<ElemType>* old_t = _tail;
do {
// 当非NULL的时候说明不是尾部,因此需要指向下一个节点
while (t->next != NULL) {
t = t->next;
}
// 如果t->next为则null换为p
} while (!__sync_bool_compare_and_swap(&t->next, NULL, p));
// 如果尾部和原来的尾部相等,则换为p。
__sync_bool_compare_and_swap(&_tail, old_t, p);
}
bool pop_list(ElemType& e) { //e作为传出参数,记录对头的元素值
struct qnode<ElemType>* p = NULL;
do {
p = _head;
if (p->next == NULL) return false;
// 如果头部等于p,那么就指向p的下一个
} while (!__sync_bool_compare_and_swap(&_head, p, p->next));
e = p->data; //p为最初的head值,返回旧队头值
delete p;
p = NULL;
return true;
}
};
该模板类的实现,也即是记录队头和队尾节点,链表的头节点没有存放数据(同样是哑节点?):
- push 到队尾的时候,先判断当前指针是否是队列的实际尾部,即使用CAS进行判断是不是,如果是,则
p->next == nullptr
,将 p->next 替换为 q, 同时更新 tail 尾节点的值;如果不是,则在 do while(); 当中进行寻找实际队尾节点; - pop 出队头的时候,同样也是在 do while(); 循环当中判断队列是否非空,非空则将之前的节点值返回,并将其删除,更新 head 头节点的值;
2.2 使用数组实现环形无锁队列
此外,还可以使用数组实现,因为环形数组一经内存申请后,不会再涉及内存请求和释放:
- 队列实现的形式是环形数组的形式;
- 队列的元素的值,初始的时候是三种可能的值。HEAD、TAIL、EMPTY;
- 数组一开始所有的元素都初始化为 EMPTY。有两个相邻的元素初始化为 HEAD 与 TAIL,代表着空队列;
- 入队操作。假设数据 x 要入队列,定位 TAIL 的位置,使用 double-CAS 方法把 (TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到 (TAIL, EMPTY),则说明队列满了。
- 出队操作。定位 HEAD 的位置,把 (HEAD, x) 更新成 (EMPTY, HEAD),并把 x 返回。同样需要注意,如果 x 是 TAIL,则说明队列为空。
3. ABA 问题及解决
简单的说就是线程A将当前值修改为10,此时线程B将值改为11,然后又有一个线程C把值又改为10,这样的话对于线程A来说取到的内存值和当前值是没变的,所以可以更新,但实际上是经过变化的,所以不符合实际逻辑的。
注意到CAS比较的是指针取内容得到的值,那么,假定某个线程准备出队操作,首先声明一个指向p指针head结点,接着要进行CAS操作,CAS(head,p,p->next)。假定在执行CAS操作之前,有个线程进行了入队操作,此时,head!=p,正常情形CAS(head,p,p->next)应该返回为false。但是,在CAS(head,p,p->next)之前,又有线程进行了入队操作,而入队的这个结点占用的内存恰恰是最开始的时候p所指向的内存,再恰恰经过一系列出队操作,使得当前头指针刚好指向刚刚入队操作的那块结点,最后,才开始,进行CAS操作。我们会发现原本应该返回为false的CAS操作,返回了true!(CAS比较的是内存地址所存放的值,==)。
解决ABA问题,可加入版本号这一控制信息,Java中有AtomicStampedReference类可以添加版本在比对内存值的时候加以区分。
4. 参考资料
以上是关于CAS无锁队列的实现的主要内容,如果未能解决你的问题,请参考以下文章