高性能队列Disruptor使用入门,原理和代码实现

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高性能队列Disruptor使用入门,原理和代码实现相关的知识,希望对你有一定的参考价值。

网上有不少关于Disruptor的文章,这里聊聊自己的看法。
我总结Disruptor高性能的实现在于如下几点:

  • 缓存行填充对齐
  • 无锁CAS操作
  • 环形缓冲区
  • 异步并发消费

缓存行失效、伪共享

一般我们知道在我们程序运行的时候,内存访问都是比较快速的,但是CPU并不和内存打交道,而是和寄存器,当CPU要读取数据时,从内存中将数据加载到寄存器中,CPU的每个核心都有一个专属的寄存器,但是寄存器受限于成本,一般容量比较小,后来人们又在寄存器和内存中增加了所谓的L1,L2,L3三级缓存,当CPU要访问一个数据的时候,首先从L1缓存查找没有再从L2查找,没有在L3查找,如果L1,L2,L3缓存都查找不到,那么从内存加载。而这里的L1,L2,L3缓存的单位并不是我们常见的字节,而是Cache Line,缓存行,L1,L2,3缓存都是以缓存行为单位操作的,一般常见的缓存行大小为64字节,如果按照java中long类型的算的话,缓存行一次只能填充8个long类型数据。
一般L1,L2都是每个CPU核心独享的,L3是共享的,当CPU更改了缓存中的数据,缓存中的数据和内存中的数据就不一致了,如果其他CPU线程从内存读取数据,这时候数据就不正确了,该怎么处理呢 ?这时候就有了缓存一致性协议和内存屏障(具体内容大家可以百度,内存屏障又有写屏障-缓存更新后必须立马同步更新到主内存对所有线程可见,读屏障-读取之前清空缓存里的数据从主内存加载,相当于是每次都从主内存加载),简单来说,就是cpu0更新缓存并同时更新到内存中,cpu1如果缓存中有数据,那么该数据失效(缓存失效),需要从新从内存中读取。

网上经常说的ArrayBlockingQueue的伪共享问题就是因为当多线程操作时,ArrayBlockingQueue的三个属性takeIndex、count、putIndex这三个元素,并不是都一起更改的,当一个线程更改了上述属性中的一个(在缓存中),另外一个线程的这几个参数(在缓存中),这三个变量很容易放到一个缓存行里,由于发生了缓存失效,又得从新从内存中读取一次,这样导致多线程条件下,每次修改,其他线程又需要从内存中加载一次数据,即所谓的伪共享。
那么有什么方式解决呢 ?
一般可以通过填充方式,让字段位于不同的缓存行里。
在Disruptor中,其很重要的一个类Sequencer就是采用了缓存行填充方式,我们看下这个类的实现:

public final class SingleProducerSequencer extends SingleProducerSequencerFields

    protected long p1, p2, p3, p4, p5, p6, p7;
	....

这里SingleProducerSequencer填充了7个long类型,加上自己需要的value(long)正好8个long,大小为64B,正好一个缓存行。

2. 无锁CAS操作

我们知道多线程情况下为了对一些共享资源写入的保障,提供了锁机制,一般有悲观锁和乐观锁两种。悲观锁就是假设每次对数据操作的时候都会有多个线程在操作写入,因此需要独占数据,别人要想获取到数据必须阻塞等待当前线程释放锁。乐观锁是假设每次操作数据的时候被人不会修改数据,但是在更新数据的时候会判断一下数据有没有被修改,如果被修改了则重新获取操作,CAS是乐观锁
CAS是需要计算机底层硬件支持的
Disruptor中很多状态判断和操作都是基于CAS来操作的,比如sequence的更新

3. 环形缓冲区

Dirsuptor中数据存储采用的是一个环形缓冲区,这是一个数组,和一般的不同,这个数组只有一个指针,Disruptor中辅以各种Sequence来维护这个下标。
Disruptor中的这个所谓指针下标是一个long类型,一直增长,而环形缓冲区RingBuffer的大小由程序制定,为2的N次,我们设置RingBuffer的大小为buffer_size,而这里的指针在Disruptor中为Sequencer,里面维护了一个nextValue,一般通过nextValue & (buffer_size-1)来获取到nextValue对应在RingBuffer数组中的下标位置。
在Dirsuptor中消费者会维护一个消费者已经消费到的指针位置,如果消费者指针位置和nextValue 已经相等,表明当前RingBuffer中的数据已经消费完了。
另外Dirsuptor中RingBuffer在启动的时候就会初始化buffer_size个T类型的实例,后续设置的时候会通过Unsafe方式直接修改对应地址里面的内容

4. 异步并发消费

Disruptor中的消费都是异步的,在启动的时候,可以指定固定消费的线程数,在Disruptor启动后,这些线程就会启动并一直运行,其逻辑就是判断当前RingBuffer中是否还有未消费的数据,如果没有线程会等待(可以设置等待策略)。如果生产线程产生了新数据,会通知消费线程工作。

大致原理就是这样的,接下来我们看下Disruptor中具体实现。
一般使用Disruptor的大致流程如下:

  1. 创建待使用的数据类型定义:
public class MyEvent 
    private long value;
    public long getValue() 
        return value;
    
    public void setValue(long value) 
        this.value = value;
    

  1. 创建EventFactory,RingBuffer会调用这个工厂方法来填充RingBuffer中数组的所有元素
public class MyEventFactory implements EventFactory 
    @Override
    public Object newInstance() 
        return new MyEvent();
    

  1. 创建消费处理逻辑,主要有两种EventHandlerWorkHandler:
public class MyEventHandler implements EventHandler<MyEvent> 
    private String name;
    public MyEventHandler(String name)
        this.name = name;
    
    @Override
    public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception 
        System.out.println("name= "+name+" , value = "+event.getValue()+" , sequence = "+sequence+" , endOfBatch="+endOfBatch);
    

public class MyWorkHandler implements WorkHandler<MyEvent> 
    private String name;
    public MyWorkHandler(String name)
        this.name = name;
    

    @Override
    public void onEvent(MyEvent event) throws Exception 
        System.out.println("name= "+name+" , value = "+event.getValue());
    

  1. 创建消费者并组装:
public class MyDisruptorTest 
    public static void main(String[] args) 
        Executor executor = Executors.newCachedThreadPool();
        MyEventFactory factory = new MyEventFactory();
        int ringBufferSize = 1024;
        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(factory,ringBufferSize,
                executor,ProducerType.MULTI, new BlockingWaitStrategy());
        disruptor.handleEventsWith(new MyEventHandler("test-001"), new MyEventHandler("test-002"));
        disruptor.handleEventsWithWorkerPool(new MyWorkHandler("test-001"), new MyWorkHandler("test-002"));
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        ByteBuffer buf = ByteBuffer.allocate(8);

        for(long i=0;i<10;i++)
        
            buf.putLong(0,i);
			// 两种方式发布,一种是通过publishEvent,另外一种是直接设置ringBuffer对应数据然后publish
            ringBuffer.publishEvent(new EventTranslatorOneArg<MyEvent,ByteBuffer>()
                @Override
                public void translateTo(MyEvent event, long sequence, ByteBuffer buf) 
                    event.setValue(buf.getLong(0));
                
            ,buf);
// 设置ringBuffer的数据然后publish			
//            long next = ringBuffer.next();
//            MyEvent event = ringBuffer.get(next);
//            event.setValue(i);
//            ringBuffer.publish(next);

            buf.clear();
        
    

接下来我们看下RingBuffer中的实现,首先看Disruptr中构造入口:

// Disruptor.java
 public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
    
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
    

// RingBuffer.java
public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    

而RingBuffer在启动的时候其父类RingBufferFields中会对其数据的数组进行初始化:

RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        
        if (Integer.bitCount(bufferSize) != 1)
        
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    

    private void fill(EventFactory<E> eventFactory)
    
        for (int i = 0; i < bufferSize; i++)
        
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        
    

RingBuffer中通过private final Object[] entries;来保存数据。而在构造RingBuffer的时候通过EventFactoryentries数组的每个元素都会初始化。
另外在Disruptor中有两种消费者类型:

public enum ProducerType

    SINGLE,
    MULTI

即单生产者和多生产者模型。

然后在handleEventsWith的时候会创建BatchEventProcessor并返回一个EventHandlerGroup
handleEventsWithWorkerPool类似:

public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
    
        for (final EventProcessor processor : processors)
        
            consumerRepository.add(processor);
        

        final Sequence[] sequences = new Sequence[processors.length];
        for (int i = 0; i < processors.length; i++)
        
            sequences[i] = processors[i].getSequence();
        

        ringBuffer.addGatingSequences(sequences);

        return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
    
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
    
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
        consumerRepository.add(workerPool, sequenceBarrier);
        final Sequence[] workerSequences = workerPool.getWorkerSequences();

        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
        return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
    
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
    
        return createWorkerPool(new Sequence[0], workHandlers);
    
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
    
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);


        consumerRepository.add(workerPool, sequenceBarrier);

        final Sequence[] workerSequences = workerPool.getWorkerSequences();

        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

        return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
    

这块是消费者异步消费的入口。
当执行Disruptor.start的时候就会启动消费线程:

   public RingBuffer<T> start()
    
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        
            consumerInfo.start(executor);
        

        return ringBuffer;
    

这里的consumerRepository的consumerInfo就是我们之前加入的各种处理在consumerRepository再次封装成EventProcessorInfoWorkerPoolInfo,我们看下WorkerPoolInfo怎么实现的:

public void start(Executor executor)
    
        workerPool.start(executor);
    
public WorkerPool(
        final EventFactory<T> eventFactory,
        final ExceptionHandler<? super T> exceptionHandler,
        final WorkHandler<? super T>... workHandlers)
    
        ringBuffer = RingBuffer.createMultiProducer(eventFactory, 1024, new BlockingWaitStrategy());
        final SequenceBarrier barrier = ringBuffer.newBarrier();
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];

        for (int i = 0; i < numWorkers; i++)
        
            workProcessors[i] = new WorkProcessor<T>(
                ringBuffer,
                barrier,
                workHandlers[i],
                exceptionHandler,
                workSequence);
        

        ringBuffer.addGatingSequences(getWorkerSequences());
    
public RingBuffer<T> start(final Executor executor)
    
        if (!started.compareAndSet(false, true))
        
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        

        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);

        for (WorkProcessor<?> processor : workProcessors)
        
            processor.getSequence().set(cursor);
            executor.execute(processor);
        

        return ringBuffer;
    

可以看到,对于我们传入的WorkHandler,WorkerPool中对其封装成了WorkProcessor,start的时候,会启动WorkProcessor,其是Runnable的实现类,run方法逻辑如下

public void run()
    
        if (!running.compareAndSet(false, true))
        
            throw new IllegalStateException("Thread is already running");
        
        sequenceBarrier.clearAlert();
        notifyStart();
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        
            try
            
                if (processedSequence)
                
                    processedSequence = false;
                    do
                    
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                
                if (cachedAvailableSequence >= nextSequence)
                
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                
                else
                
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                
            
            ....
        
        notifyShutdown();
        running.set(false);
    

这里的workSequence是多个线程共享一个的,可以看到每组WorkHandler中每条消息只会被一个WorkHandler消费,这里的run就是一个while死循环,循环大致逻辑是:
获取一个当前线程可以处理的nextSequence指针,如果生产者已经生产的数据的指针即cachedAvailableSequence>nextSequence,表明当前有数据可以消费,执行workHandler.onEvent的逻辑
如果当前没有可以消费的数据,那么会通过SequenceBarrier.waitFor来等待,我们看下其默认实现ProcessingSequenceBarrier:

public long waitFor(分布式技术专题线程间的高性能消息框架-深入浅出Disruptor的使用和原理

高性能并发队列Disruptor

高性能并发队列Disruptor

disruptor笔记之一:快速入门

并发框架Disruptor学习入门

优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析