C#中使用CAS实现无锁算法

Posted 黑洞视界

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#中使用CAS实现无锁算法相关的知识,希望对你有一定的参考价值。

CAS 的基本概念

CAS(Compare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。

CompareAndSwap(内存地址,期望的旧值,新值)

CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。

CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。

CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。

相比于传统的锁机制,CAS 有一些优势:

  • 原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。

  • 无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。

  • 适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。

C# 中如何使用 CAS

在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。

Interlocked 类提供了一组 CompareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。

public static int CompareExchange(ref int location1, int value, int comparand);
public static long CompareExchange(ref long location1, long value, long comparand);
// ... 省略其他重载方法
public static object CompareExchange(ref object location1, object value, object comparand);
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

CompareExchange 方法将 location1 内存地址处的值与 comparand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。

通过判断方法返回值与 comparand 是否相等,我们就可以知道 CompareExchange 方法是否执行成功。

算法示例

在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:

while (true)

    // 读取数据
    oldValue = ...;
    // 计算新值
    newValue = ...;
    // CAS 更新数据
    result = CompareExchange(ref location, newValue, oldValue);
    // 判断 CAS 是否成功
    if (result == oldValue)
    
        // CAS 成功,执行后续操作
        break;
    

在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。

示例1:计数器

下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。

public class Counter

    private int _value;

    public int Increment()
    
        while (true)
        
            int oldValue = _value;
            int newValue = oldValue + 1;
            int result = Interlocked.CompareExchange(ref _value, newValue, oldValue);
            if (result == oldValue)
            
                return newValue;
            
        
    

CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。
https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446

internal void EnsureThreadRequested()

    //
    // If we have not yet requested #procs threads, then request a new thread.
    //
    // CoreCLR: Note that there is a separate count in the VM which has already been incremented
    // by the VM by the time we reach this point.
    //
    int count = _separated.numOutstandingThreadRequests;
    while (count < Environment.ProcessorCount)
    
        int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        
            ThreadPool.RequestWorkerThread();
            break;
        
        count = prev;
    

示例2:队列

下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。

这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。

public class ConcurrentQueue<T>

    // _head 和 _tail 是两个伪节点,_head._next 指向队列的第一个节点,_tail 指向队列的最后一个节点。
    // _head 和 _tail 会被多个线程修改和访问,所以要用 volatile 修饰。
    private volatile Node _head;
    private volatile Node _tail;
    
    public ConcurrentQueue()
    
        _head = new Node(default);
        // _tail 指向 _head 时,队列为空。
        _tail = _head;
    

    public void Enqueue(T item)
    
        var node = new Node(item);
        while (true)
        
            Node tail = _tail;
            Node next = tail._next;
            // 判断给 next 赋值的这段时间,是否有其他线程修改过 _tail
            if (tail == _tail)
            
                // 如果 next 为 null,则说明从给 tail 赋值到给 next 赋值这段时间,没有其他线程修改过 tail._next,
                if (next == null)
                
                    // 如果 tail._next 为 null,则说明从给 tail 赋值到这里,没有其他线程修改过 tail._next,
                    // tail 依旧是队列的最后一个节点,我们就可以直接将 node 赋值给 tail._next。                                
                    if (Interlocked.CompareExchange(ref tail._next, node, null) == null)
                    
                        // 如果_tail == tail,则说明从上一步 CAS 操作到这里,没有其他线程修改过 _tail,也就是没有其他线程执行过 Enqueue 操作。
                        // 那么当前线程 Enqueue 的 node 就是队列的最后一个节点,我们就可以直接将 node 赋值给 _tail。
                        Interlocked.CompareExchange(ref _tail, node, tail);
                        break;
                    
                
                // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,
                else
                
                    // 如果没有其他线程修改过 _tail,那么 next 就是队列的最后一个节点,我们就可以直接将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                
            
        
    

    public bool TryDequeue(out T item)
    
        while (true)
        
            Node head = _head;
            Node tail = _tail;
            Node next = head._next;
            // 判断 _head 是否被修改过
            // 如果没有被修改过,说明从给 head 赋值到给 next 赋值这段时间,没有其他线程执行过 Dequeue 操作。          
            if (head == _head)
            
                // 如果 head == tail,说明队列为空
                if (head == tail)
                
                    // 虽然上面已经判断过队列是否为空,但是在这里再判断一次
                    // 是为了防止在给 tail 赋值到给 next 赋值这段时间,有其他线程执行过 Enqueue 操作。
                    if (next == null)
                    
                        item = default;
                        return false;
                    

                    // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,也就是有其他线程执行过 Enqueue 操作。
                    // 那么 next 就可能是队列的最后一个节点,我们尝试将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                
                // 如果 head != tail,说明队列不为空
                else
                
                    item = next._item;
                    if (Interlocked.CompareExchange(ref _head, next, head) == head)
                    
                        // 如果 _head 没有被修改过
                        // 说明从给 head 赋值到这里,没有其他线程执行过 Dequeue 操作,上面的 item 就是队列的第一个节点的值。
                        // 我们就可以直接返回。
                        break;
                    
                    // 如果 _head 被修改过
                    // 说明从给 head 赋值到这里,有其他线程执行过 Dequeue 操作,上面的 item 就不是队列的第一个节点的值。
                    // 我们就需要重新执行 Dequeue 操作。
                
            
        

        return true;
    

    private class Node
    
        public readonly T _item;
        public Node _next;

        public Node(T item)
        
            _item = item;
        
    

我们可以通过以下代码来进行测试

using System.Collections.Concurrent;

var queue = new ConcurrentQueue<int>();
var results = new ConcurrentBag<int>();
int dequeueRetryCount = 0;

var enqueueTask = Task.Run(() =>

    // 确保 Enqueue 前 dequeueTask 已经开始运行
    Thread.Sleep(10);
    Console.WriteLine("Enqueue start");
    Parallel.For(0, 100000, i => queue.Enqueue(i));
    Console.WriteLine("Enqueue done");
);

var dequeueTask = Task.Run(() =>

    Thread.Sleep(10);
    Console.WriteLine("Dequeue start");
    Parallel.For(0, 100000, i =>
    
        while (true)
        
            if (queue.TryDequeue(out int result))
            
                results.Add(result);
                break;
            

            Interlocked.Increment(ref dequeueRetryCount);
        
    );
    Console.WriteLine("Dequeue done");
);

await Task.WhenAll(enqueueTask, dequeueTask);
Console.WriteLine(
    $"Enqueue and dequeue done, total data count: results.Count, dequeue retry count: dequeueRetryCount");

var hashSet = results.ToHashSet();
for (int i = 0; i < 100000; i++)

    if (!hashSet.Contains(i))
    
        Console.WriteLine("Error, missing " + i);
        break;
    


Console.WriteLine("Done");

输出结果:

Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done

上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。

当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。

总结

CAS 操作是一种乐观锁,它假设没有其他线程修改过数据,如果没有修改过,那么就直接修改数据,如果修改过,那么就重新获取数据,再次尝试修改。

在借助 CAS 实现较为复杂的数据结构时,我们不光要依靠 CAS 操作,还需要注意每次操作的数据是否被其他线程修改过,考虑各个可能的分支,以及在不同的分支中,如何处理数据。

欢迎关注个人技术公众号

CAS无锁队列的实现



1. 基本原理

源于1994年10月发表在国际并行与分布式会议上的论文【无锁队列的实现.pdf】。CAS(Compare And Swap,CAS维基百科)指令。CAS的实现可参考下面的代码:

bool CAS(int* pAddr, int nExpected, int nNew) atomically {
	if(*pAddr == nExpected) {
		*pAddr = nNew;
		return true;
	} else return false;
}
//CAS返回bool告知原子性交换是否成功

CAS 根据字面意思即可理解,就是对数据进行交换的一种原子操作。对应到 CPU 指令的话就是cmpchg
无锁队列的内部实现实际上也是原子操作,可以避免多线程调用出现的不可预知的情况,也即进行线程的并发同步。主要的核心就是函数__sync_bool_compare_and_swap,返回类型是 bool 型,原子交换成功返回 true,失败返回 false。



2. 代码实现


2.1 使用链表实现无锁队列

入队操作

Enqueue(EleType x) {
	Node* q = new Node();
	q->x = x;
	q->next = nullptr;
	do {
		Node* p = tail;
	} while(!CAS(p->next, nullptr, q));
	CAS(tail, p, q);
}

这样实现的话会导致若是某个线程在执行入队操作时,在CAS(tail, p, q)执行之前,也即将尾节点替换为新加入的入队节点前挂掉了,那么将导致其它的线程在执行入队操作CAS(p->next, nullptr, q)时一直返回 false,即无限循环于此,因为 p->next 已经更改为 q,但是tail并没有更新为 q 仍然是 p 。

基于此,可以对以上的入队操作进行改进,在do while();当中找到无锁队列的实际尾节点,即p->next==nullptr,再进行 tail 节点的替换,实现代码如下:

Enqueue(EleType x) {
	Node* q= new Node();
	q->x = x;
	q->next = nullptr;
	
	Node* p = tail;
	Node* oldP = tail;
	do {
		while(p->next != nullptr)
			p = p->next;
	} while(!CAS(p->next, nullptr, q));
	CAS(tail, oldP, q);
}

这样的话即使出现上文中讲述的更新 tail 前线程挂掉的情况,在进入到 do while(); 循环时,p 将会指定为无锁队列的实际尾部节点,从而CAS(p->next, nullptr, q)返回 true,结束循环,更新 tail 节点值。

出队操作

Dequeue() {
	do {
		Node* p = head->next;
		if(p == nullptr)
			return EMPTY;
	} while(CAS(head->next, p, p->next));
	return p->val;
}
//这里的head是一个哑头节点

模板无锁队列类的实现

有了如上的思想,那么就可以实现一个模板无锁队列类,代码如下:

// 定义一个链表实现队列
template <typename ElemType>
struct qnode // 链表节点
{
  struct qnode *_next;
  ElemType _data;
};
 
template <typename ElemType>
class Queue
{
private:
  struct qnode<ElemType> *volatile _head = NULL;  // 随着pop后指向的位置是不一样的, head不是固定的
  struct qnode<ElemType> *volatile _tail = NULL;
 
public:
  Queue() {
    _head = _tail = new qnode<ElemType>;
    _head->_next = NULL;
    _tail->_next = NULL;
    printf("Queue _head:%p\\n", _head);
  }
 
  void push_list(const ElemType& e) {
    struct qnode<ElemType>* p = new qnode<ElemType>;
    if (!p) return ; //p生成失败
    p->next = NULL;
    p->data = e;
    
    struct qnode<ElemType>* t = _tail;
    struct qnode<ElemType>* old_t = _tail;
    
    do {
      // 当非NULL的时候说明不是尾部,因此需要指向下一个节点
      while (t->next != NULL) {  
        t = t->next;
      }
    // 如果t->next为则null换为p
    } while (!__sync_bool_compare_and_swap(&t->next, NULL, p));
    // 如果尾部和原来的尾部相等,则换为p。
    __sync_bool_compare_and_swap(&_tail, old_t, p);
  }
  
  bool pop_list(ElemType& e) { //e作为传出参数,记录对头的元素值
    struct qnode<ElemType>* p = NULL;
    do {
      p = _head;
      if (p->next == NULL) return false;
      // 如果头部等于p,那么就指向p的下一个
    } while (!__sync_bool_compare_and_swap(&_head, p, p->next));
 
    e = p->data; //p为最初的head值,返回旧队头值
    delete p;
    p = NULL;
    return true;
  }
};

该模板类的实现,也即是记录队头和队尾节点,链表的头节点没有存放数据(同样是哑节点?):

  • push 到队尾的时候,先判断当前指针是否是队列的实际尾部,即使用CAS进行判断是不是,如果是,则 p->next == nullptr,将 p->next 替换为 q, 同时更新 tail 尾节点的值;如果不是,则在 do while(); 当中进行寻找实际队尾节点;
  • pop 出队头的时候,同样也是在 do while(); 循环当中判断队列是否非空,非空则将之前的节点值返回,并将其删除,更新 head 头节点的值;

2.2 使用数组实现环形无锁队列

此外,还可以使用数组实现,因为环形数组一经内存申请后,不会再涉及内存请求和释放:

  • 队列实现的形式是环形数组的形式;
  • 队列的元素的值,初始的时候是三种可能的值。HEAD、TAIL、EMPTY;
  • 数组一开始所有的元素都初始化为 EMPTY。有两个相邻的元素初始化为 HEAD 与 TAIL,代表着空队列;
  • 入队操作。假设数据 x 要入队列,定位 TAIL 的位置,使用 double-CAS 方法把 (TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到 (TAIL, EMPTY),则说明队列满了。
  • 出队操作。定位 HEAD 的位置,把 (HEAD, x) 更新成 (EMPTY, HEAD),并把 x 返回。同样需要注意,如果 x 是 TAIL,则说明队列为空。


3. ABA 问题及解决

简单的说就是线程A将当前值修改为10,此时线程B将值改为11,然后又有一个线程C把值又改为10,这样的话对于线程A来说取到的内存值和当前值是没变的,所以可以更新,但实际上是经过变化的,所以不符合实际逻辑的。

注意到CAS比较的是指针取内容得到的值,那么,假定某个线程准备出队操作,首先声明一个指向p指针head结点,接着要进行CAS操作,CAS(head,p,p->next)。假定在执行CAS操作之前,有个线程进行了入队操作,此时,head!=p,正常情形CAS(head,p,p->next)应该返回为false。但是,在CAS(head,p,p->next)之前,又有线程进行了入队操作,而入队的这个结点占用的内存恰恰是最开始的时候p所指向的内存,再恰恰经过一系列出队操作,使得当前头指针刚好指向刚刚入队操作的那块结点,最后,才开始,进行CAS操作。我们会发现原本应该返回为false的CAS操作,返回了true!(CAS比较的是内存地址所存放的值,==)。

解决ABA问题,可加入版本号这一控制信息,Java中有AtomicStampedReference类可以添加版本在比对内存值的时候加以区分。



4. 参考资料

  1. 基于CAS实现的无锁队列(多生产者多消费者)

  2. CAS无锁队列的原理及实现(附代码)

以上是关于C#中使用CAS实现无锁算法的主要内容,如果未能解决你的问题,请参考以下文章

算法CAS的实现和无锁编程

具体CAS操作实现(无锁算法)

CAS无锁策略

Java高并发程序设计--无锁

CAS无锁队列的实现

CAS算法