深入理解Disruptor

Posted JavaEdge.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Disruptor相关的知识,希望对你有一定的参考价值。

Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。

1 缓慢的锁

Disruptor作为一个高性能的生产者-消费者队列系统,核心就是通过RingBuffer实现一个无锁队列。

Jdk像LinkedBlockingQueue队列库,比Disruptor RingBuffer慢很多。

1.1 链表数据在内存布局对高速缓存不友好

RingBuffer使用数组:

1.2 锁依赖

生产者-消费者模式里,可能有多个消费者,也可能多个生产者。
多个生产者都要往队尾指针添加新任务,产生多线程竞争。于是,做这事时,生产者就要拿到对队尾的锁。同样多个消费者去消费队头时,也就产生竞争。同样消费者也要拿到锁。

那只有一个生产者或一个消费者,是不是就没锁的竞争问题?No!生产者-消费者模式下,消费者比生产者快。不然,队列会积压,任务越堆越多:

  • 越来越多的任务没能及时完成
  • 内存也放不下

虽然生产者-消费者模型下,都有队列作为缓冲区,但大部分情况下,这缓冲区空。即使只有一个生产者和一个消费者,这生产者指向的队尾和消费者指向的队头是同一节点。于是,这两个生产者和消费者之间一样产生锁竞争。

在LinkedBlocking Queue锁机制通过ReentrantLock实现。用Java在JVM上直接实现的加锁机制,由JVM进行裁决。这锁的争夺,会把没有拿到锁的线程挂起等待,也需经过一次上下文切换(Context Switch)。

这上下文切换要做的和异常和中断里的一样。上下文切换过程,要把当前执行线程的寄存器等信息,保存到线程栈。即已加载到高速缓存的指令或数据,又回到主内存,会进步拖慢性能。

Disruptor的Benchmark测试:把一个long类型counter,从0自增到5亿

  • 一种方式没任何锁
  • 另外一个方式每次自增时都取一个锁

分别207毫秒和9603毫秒,性能差近50倍。

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class LockBenchmark 


    public static void runIncrement() 
        long counter = 0;
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter < max) 
            counter++;
        
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms without lock");
    


    public static void runIncrementWithLock() 
        Lock lock = new ReentrantLock();
        long counter = 0;
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter < max) 
            if (lock.tryLock())
                counter++;
                lock.unlock();
            
        
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms with lock");
    


    public static void main(String[] args) 
        runIncrement();
        runIncrementWithLock();
    

加锁和不加锁自增counter
Time spent is 207ms without lock
Time spent is 9603ms with lock
性能差出将近10倍

2 无锁的RingBuffer

加锁很慢,所以Disruptor“无锁”,即没有os层的锁。
Disruptor还利用CPU硬件支持的指令,CAS,Intel CPU对应指令 cmpxchg。

和直接在链表的头和尾加锁不同,RingBuffer创建一个Sequence对象,指向当前的RingBuffer的头和尾。这头和尾的标识不是通过指针实现,而是通过序号,类名叫Sequence。

RingBuffer中进行生产者和消费者之间的资源协调,是对比序号。
当Pro想往队列加新数据,它会把当前Pro的Sequence的序号,加上需要加入的新数据的数量,然后和实际的消费者所在的位置对比,看队列里是否有足够空间加入这些数据,而不会覆盖消费者还没处理完的数据。

Sequence就是通过CAS,即UNSAFE.compareAndSwapLong:

 public boolean compareAndSet(final long expectedValue, final long newValue)
	    
	        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
	    


public long addAndGet(final long increment)
    
        long currentValue;
        long newValue;


        do
        
            currentValue = get();
            newValue = currentValue + increment;
        
        while (!compareAndSet(currentValue, newValue));


        return newValue;

Sequence源码中的addAndGet,若CAS失败,会不断忙等待重试。
CAS不是基础库函数,也不是os实现的一个系统调用,而是一个CPU硬件支持的机器指令。Intel CPU的cmpxchg指令,compxchg [ax] (隐式参数,EAX累加器), [bx] (源操作数地址), [cx] (目标操作数地址):

  • 第一个操作数不在指令里面出现,是一个隐式的操作数,也就是EAX累加寄存器里面的值
  • 第二个操作数就是源操作数,并且指令会对比这个操作数和上面的累加寄存器里面的值

若值相同,CPU会把ZF(条件码寄存器里零标志位的值)置1,再把第三个操作数(即目标操作数)设置到源操作数的地址。

不相等,就会把源操作数里的值,设置到累加器寄存器。

对应伪代码:

IF [ax]< == [bx] THEN [ZF] = 1, [bx] = [cx]
                 ELSE [ZF] = 0, [ax] = [bx] 

单指令是原子的,即CAS时,无需再加锁,直接调用。无锁,CPU就像在赛道上行驶,不会遇到需上下文切换红灯而停下来。虽会遇到CAS这样复杂机器指令,就好像赛道上会有U型弯,不过不用完全停等待,CPU运行起来仍快得多。

3 CAS到底多快

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class LockBenchmark 


    public static void runIncrementAtomic()
    
        AtomicLong counter = new AtomicLong(0);
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter.incrementAndGet() < max) 
        
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms with cas");
    


    public static void main(String[] args) 
        runIncrementAtomic();
    



Time spent is 3867ms with cas

incrementAndGet最终到CPU指令层面,就是CAS操作。它所花费时间,虽比没任何锁的操作慢一个数量级,但比使用ReentrantLock这样的操作系统锁的机制,还是减少一半时间。

4 总结

Java基础库里面的BlockingQueue,都要通过显示地加锁来保障生产者之间、消费者之间,乃至生产者和消费者之间,不会发生锁冲突的问题。

但加锁会大大拖慢性能。获取锁时,CPU没有执行计算相关指令,而要等待os或JVM进行锁竞争裁决。那些没有拿到锁而被挂起等待的线程,则需上下文切换。这上下文切换,会把挂起线程的寄存器里的数据放到线程的程序栈。即加载到高速缓存里面的数据也失效了,程序就变得更慢。

RingBuffer采用无锁方案,通过CAS进行序号自增和对比,使CPU无需获取os锁。而能继续顺序执行CPU指令。无上下文切换、os锁,程序就快。不过因为采用CAS忙等待(Busy-Wait),会使得CPU始终满负荷运转,消耗更多电,小缺点。

以上是关于深入理解Disruptor的主要内容,如果未能解决你的问题,请参考以下文章

系统性能典型案例分析:高性能队列Disruptor,一文深入理解

深入浅出计算机组成原理:理解Disruptor(上)-带你体会CPU高速缓存的风驰电掣(第54讲)

Disruptor深入解读

disruptor深入剖析

分布式技术专题线程间的高性能消息框架-深入浅出Disruptor的使用和原理

并发框架Disruptor学习入门