高性能队列Disruptor使用入门,原理和代码实现
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高性能队列Disruptor使用入门,原理和代码实现相关的知识,希望对你有一定的参考价值。
网上有不少关于Disruptor的文章,这里聊聊自己的看法。
我总结Disruptor高性能的实现在于如下几点:
- 缓存行填充对齐
- 无锁CAS操作
- 环形缓冲区
- 异步并发消费
缓存行失效、伪共享
一般我们知道在我们程序运行的时候,内存访问都是比较快速的,但是CPU并不和内存打交道,而是和寄存器,当CPU要读取数据时,从内存中将数据加载到寄存器中,CPU的每个核心都有一个专属的寄存器,但是寄存器受限于成本,一般容量比较小,后来人们又在寄存器和内存中增加了所谓的L1,L2,L3三级缓存,当CPU要访问一个数据的时候,首先从L1缓存查找没有再从L2查找,没有在L3查找,如果L1,L2,L3缓存都查找不到,那么从内存加载。而这里的L1,L2,L3缓存的单位并不是我们常见的字节,而是Cache Line,缓存行,L1,L2,3缓存都是以缓存行为单位操作的,一般常见的缓存行大小为64字节,如果按照java中long类型的算的话,缓存行一次只能填充8个long类型数据。
一般L1,L2都是每个CPU核心独享的,L3是共享的,当CPU更改了缓存中的数据,缓存中的数据和内存中的数据就不一致了,如果其他CPU线程从内存读取数据,这时候数据就不正确了,该怎么处理呢 ?这时候就有了缓存一致性协议和内存屏障(具体内容大家可以百度,内存屏障又有写屏障-缓存更新后必须立马同步更新到主内存对所有线程可见,读屏障-读取之前清空缓存里的数据从主内存加载,相当于是每次都从主内存加载),简单来说,就是cpu0更新缓存并同时更新到内存中,cpu1如果缓存中有数据,那么该数据失效(缓存失效),需要从新从内存中读取。
网上经常说的ArrayBlockingQueue
的伪共享问题就是因为当多线程操作时,ArrayBlockingQueue的三个属性takeIndex、count、putIndex
这三个元素,并不是都一起更改的,当一个线程更改了上述属性中的一个(在缓存中),另外一个线程的这几个参数(在缓存中),这三个变量很容易放到一个缓存行里,由于发生了缓存失效,又得从新从内存中读取一次,这样导致多线程条件下,每次修改,其他线程又需要从内存中加载一次数据,即所谓的伪共享。
那么有什么方式解决呢 ?
一般可以通过填充方式,让字段位于不同的缓存行里。
在Disruptor中,其很重要的一个类Sequencer
就是采用了缓存行填充方式,我们看下这个类的实现:
public final class SingleProducerSequencer extends SingleProducerSequencerFields
protected long p1, p2, p3, p4, p5, p6, p7;
....
这里SingleProducerSequencer
填充了7个long类型,加上自己需要的value(long)正好8个long,大小为64B,正好一个缓存行。
2. 无锁CAS操作
我们知道多线程情况下为了对一些共享资源写入的保障,提供了锁机制,一般有悲观锁和乐观锁两种。悲观锁就是假设每次对数据操作的时候都会有多个线程在操作写入,因此需要独占数据,别人要想获取到数据必须阻塞等待当前线程释放锁。乐观锁是假设每次操作数据的时候被人不会修改数据,但是在更新数据的时候会判断一下数据有没有被修改,如果被修改了则重新获取操作,CAS是乐观锁
CAS是需要计算机底层硬件支持的
Disruptor中很多状态判断和操作都是基于CAS来操作的,比如sequence的更新
3. 环形缓冲区
Dirsuptor中数据存储采用的是一个环形缓冲区,这是一个数组,和一般的不同,这个数组只有一个指针,Disruptor中辅以各种Sequence来维护这个下标。
Disruptor中的这个所谓指针下标是一个long类型,一直增长,而环形缓冲区RingBuffer的大小由程序制定,为2的N次,我们设置RingBuffer的大小为buffer_size
,而这里的指针在Disruptor中为Sequencer
,里面维护了一个nextValue
,一般通过nextValue & (buffer_size-1)
来获取到nextValue对应在RingBuffer数组中的下标位置。
在Dirsuptor中消费者会维护一个消费者已经消费到的指针位置,如果消费者指针位置和nextValue 已经相等,表明当前RingBuffer中的数据已经消费完了。
另外Dirsuptor中RingBuffer在启动的时候就会初始化buffer_size个T类型的实例,后续设置的时候会通过Unsafe方式直接修改对应地址里面的内容
4. 异步并发消费
Disruptor中的消费都是异步的,在启动的时候,可以指定固定消费的线程数,在Disruptor启动后,这些线程就会启动并一直运行,其逻辑就是判断当前RingBuffer中是否还有未消费的数据,如果没有线程会等待(可以设置等待策略)。如果生产线程产生了新数据,会通知消费线程工作。
大致原理就是这样的,接下来我们看下Disruptor中具体实现。
一般使用Disruptor的大致流程如下:
- 创建待使用的数据类型定义:
public class MyEvent
private long value;
public long getValue()
return value;
public void setValue(long value)
this.value = value;
- 创建
EventFactory
,RingBuffer会调用这个工厂方法来填充RingBuffer中数组的所有元素
public class MyEventFactory implements EventFactory
@Override
public Object newInstance()
return new MyEvent();
- 创建消费处理逻辑,主要有两种
EventHandler
和WorkHandler
:
public class MyEventHandler implements EventHandler<MyEvent>
private String name;
public MyEventHandler(String name)
this.name = name;
@Override
public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
System.out.println("name= "+name+" , value = "+event.getValue()+" , sequence = "+sequence+" , endOfBatch="+endOfBatch);
public class MyWorkHandler implements WorkHandler<MyEvent>
private String name;
public MyWorkHandler(String name)
this.name = name;
@Override
public void onEvent(MyEvent event) throws Exception
System.out.println("name= "+name+" , value = "+event.getValue());
- 创建消费者并组装:
public class MyDisruptorTest
public static void main(String[] args)
Executor executor = Executors.newCachedThreadPool();
MyEventFactory factory = new MyEventFactory();
int ringBufferSize = 1024;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(factory,ringBufferSize,
executor,ProducerType.MULTI, new BlockingWaitStrategy());
disruptor.handleEventsWith(new MyEventHandler("test-001"), new MyEventHandler("test-002"));
disruptor.handleEventsWithWorkerPool(new MyWorkHandler("test-001"), new MyWorkHandler("test-002"));
RingBuffer<MyEvent> ringBuffer = disruptor.start();
ByteBuffer buf = ByteBuffer.allocate(8);
for(long i=0;i<10;i++)
buf.putLong(0,i);
// 两种方式发布,一种是通过publishEvent,另外一种是直接设置ringBuffer对应数据然后publish
ringBuffer.publishEvent(new EventTranslatorOneArg<MyEvent,ByteBuffer>()
@Override
public void translateTo(MyEvent event, long sequence, ByteBuffer buf)
event.setValue(buf.getLong(0));
,buf);
// 设置ringBuffer的数据然后publish
// long next = ringBuffer.next();
// MyEvent event = ringBuffer.get(next);
// event.setValue(i);
// ringBuffer.publish(next);
buf.clear();
接下来我们看下RingBuffer中的实现,首先看Disruptr中构造入口:
// Disruptor.java
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
// RingBuffer.java
public static <E> RingBuffer<E> createMultiProducer(
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
而RingBuffer在启动的时候其父类RingBufferFields
中会对其数据的数组进行初始化:
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
throw new IllegalArgumentException("bufferSize must not be less than 1");
if (Integer.bitCount(bufferSize) != 1)
throw new IllegalArgumentException("bufferSize must be a power of 2");
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
private void fill(EventFactory<E> eventFactory)
for (int i = 0; i < bufferSize; i++)
entries[BUFFER_PAD + i] = eventFactory.newInstance();
在RingBuffer
中通过private final Object[] entries;
来保存数据。而在构造RingBuffer的时候通过EventFactory
将entries
数组的每个元素都会初始化。
另外在Disruptor中有两种消费者类型:
public enum ProducerType
SINGLE,
MULTI
即单生产者和多生产者模型。
然后在handleEventsWith
的时候会创建BatchEventProcessor
并返回一个EventHandlerGroup
handleEventsWithWorkerPool
类似:
public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
for (final EventProcessor processor : processors)
consumerRepository.add(processor);
final Sequence[] sequences = new Sequence[processors.length];
for (int i = 0; i < processors.length; i++)
sequences[i] = processors[i].getSequence();
ringBuffer.addGatingSequences(sequences);
return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
return createWorkerPool(new Sequence[0], workHandlers);
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
这块是消费者异步消费的入口。
当执行Disruptor.start
的时候就会启动消费线程:
public RingBuffer<T> start()
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
consumerInfo.start(executor);
return ringBuffer;
这里的consumerRepository的consumerInfo就是我们之前加入的各种处理在consumerRepository
再次封装成EventProcessorInfo
和WorkerPoolInfo
,我们看下WorkerPoolInfo
怎么实现的:
public void start(Executor executor)
workerPool.start(executor);
public WorkerPool(
final EventFactory<T> eventFactory,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers)
ringBuffer = RingBuffer.createMultiProducer(eventFactory, 1024, new BlockingWaitStrategy());
final SequenceBarrier barrier = ringBuffer.newBarrier();
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
workProcessors[i] = new WorkProcessor<T>(
ringBuffer,
barrier,
workHandlers[i],
exceptionHandler,
workSequence);
ringBuffer.addGatingSequences(getWorkerSequences());
public RingBuffer<T> start(final Executor executor)
if (!started.compareAndSet(false, true))
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors)
processor.getSequence().set(cursor);
executor.execute(processor);
return ringBuffer;
可以看到,对于我们传入的WorkHandler
,WorkerPool中对其封装成了WorkProcessor
,start的时候,会启动WorkProcessor
,其是Runnable的实现类,run方法逻辑如下
public void run()
if (!running.compareAndSet(false, true))
throw new IllegalStateException("Thread is already running");
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
try
if (processedSequence)
processedSequence = false;
do
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
if (cachedAvailableSequence >= nextSequence)
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
else
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
....
notifyShutdown();
running.set(false);
这里的workSequence
是多个线程共享一个的,可以看到每组WorkHandler
中每条消息只会被一个WorkHandler
消费,这里的run就是一个while死循环,循环大致逻辑是:
获取一个当前线程可以处理的nextSequence指针,如果生产者已经生产的数据的指针即cachedAvailableSequence>nextSequence,表明当前有数据可以消费,执行workHandler.onEvent的逻辑
如果当前没有可以消费的数据,那么会通过SequenceBarrier.waitFor
来等待,我们看下其默认实现ProcessingSequenceBarrier
:
public long waitFor(分布式技术专题线程间的高性能消息框架-深入浅出Disruptor的使用和原理