disruptor
Posted holoyong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor相关的知识,希望对你有一定的参考价值。
disruptor提供了线程间消息通信机制,通常会拿它和jdk中的blockingQueue作比较,blockingQueue使用了ReentrantLock。
下例的逻辑是,生产500个ValueEvent,先后由toDbHandler和businessHandler消费事件。
normalTest使用的是低级接口,DSLWizardTest使用的是高级接口,后者明显要简洁一些。
1 class ValueEvent { 2 private long value; 3 4 public long getValue() { 5 return value; 6 } 7 8 public void setValue(final long value) { 9 this.value = value; 10 } 11 12 @Override 13 public String toString() { 14 return "ValueEvent{" + 15 "value=" + value + 16 \'}\'; 17 } 18 } 19 // 生产者的线程工厂 20 class SimpleThreadFactory implements java.util.concurrent.ThreadFactory { 21 @Override 22 public Thread newThread(Runnable r) { 23 return new Thread(r, "simpleThread"); 24 } 25 }; 26 27 // RingBuffer生产工厂,初始化RingBuffer的时候使用 28 class SimpleEventFactory implements EventFactory<ValueEvent> { 29 @Override 30 public ValueEvent newInstance() { 31 return new ValueEvent(); 32 } 33 }; 34 35 class JournalEventHandler implements EventHandler<ValueEvent> { 36 public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { 37 // process a new event. 38 System.out.println("to db: " + event); 39 } 40 }; 41 class ReplicationEventHandler implements EventHandler<ValueEvent> { 42 public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { 43 // process a new event. 44 System.out.println("replication: " + event); 45 } 46 }; 47 class ApplicationEventHandler implements EventHandler<ValueEvent> { 48 public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { 49 // process a new event. 50 System.out.println(event); 51 // System.out.println("sequence: " + sequence); 52 // System.out.println("end of batch: " + endOfBatch); 53 } 54 };
1 public class TestDisruptor { 2 3 // 指定RingBuffer的大小 4 private int bufferSize = 16; 5 // 阻塞策略 6 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); 7 8 @Test 9 public void normalTest() throws InterruptedException { 10 RingBuffer<ValueEvent> ringBuffer = 11 RingBuffer.createSingleProducer(new SimpleEventFactory(), bufferSize, strategy); 12 13 SequenceBarrier barrier = ringBuffer.newBarrier(); 14 BatchEventProcessor<ValueEvent> journalEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new JournalEventHandler()); 15 BatchEventProcessor<ValueEvent> replicationEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new ReplicationEventHandler()); 16 17 barrier = ringBuffer.newBarrier(journalEventProcessor.getSequence(), replicationEventProcessor.getSequence()); 18 BatchEventProcessor<ValueEvent> businessEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, new ApplicationEventHandler()); 19 20 21 ringBuffer.addGatingSequences(businessEventProcessor.getSequence()); 22 23 ExecutorService executor = Executors.newCachedThreadPool(new SimpleThreadFactory()); 24 // Each EventProcessor can run on a separate thread 25 executor.submit(businessEventProcessor); 26 executor.submit(journalEventProcessor); 27 executor.submit(replicationEventProcessor); 28 29 for (int i = 0; i < 500; i++) { 30 // Publishers claim events in sequence 31 long sequence = ringBuffer.next(); 32 try { 33 ValueEvent event = ringBuffer.get(sequence); 34 event.setValue(i); // this could be more complex with multiple fields 35 } finally { 36 ringBuffer.publish(sequence); 37 } 38 // make the event available to EventProcessors 39 Thread.sleep(10); 40 } 41 Thread.sleep(100); 42 } 43 44 /** 45 * DSLWizard 46 * 47 * @throws Exception 48 */ 49 @Test 50 public void dslWizardTest() throws Exception { 51 // 创建disruptor,采用单生产者模式 52 Disruptor<ValueEvent> disruptor = 53 new Disruptor<>(new SimpleEventFactory(), bufferSize, new SimpleThreadFactory(), ProducerType.SINGLE, strategy); 54 // 设置EventHandler 55 disruptor.handleEventsWith(new JournalEventHandler(), new ReplicationEventHandler()) 56 .then(new ApplicationEventHandler()); 57 // 启动disruptor的线程 58 disruptor.start(); 59 60 EventTranslator<ValueEvent> eventTranslator = new EventTranslator<ValueEvent>() { 61 private AtomicInteger value = new AtomicInteger(0); 62 63 @Override 64 public void translateTo(ValueEvent event, long sequence) { 65 event.setValue(value.getAndIncrement()); 66 } 67 }; 68 for (int l = 0; l < 500; l++) { 69 disruptor.publishEvent(eventTranslator); 70 Thread.sleep(10); 71 } 72 } 73 }
上例实现有一个问题,ApplicationConsumer(ApplicationEventHandler)非常耗时时会批量阻塞Producer的生产,比如JournalConsumer、ReplicationConsumer可以很快处理16个事件(因为此例中RingBuffer大小设为16),ApplicationConsumer慢速处理导致生产者迟迟不能生产,直至ApplicationConsumer将RingBuffer全部消费完。
1 while (true) 2 { 3 try 4 { 5 //获取(最大)可消费序号 6 final long availableSequence = sequenceBarrier.waitFor(nextSequence); 7 if (batchStartAware != null) 8 { 9 batchStartAware.onBatchStart(availableSequence - nextSequence + 1); 10 } 11 12 while (nextSequence <= availableSequence) 13 { 14 event = dataProvider.get(nextSequence); 15 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); 16 nextSequence++; 17 } 18 //只有批量处理完成后才设置sequence,而producer在生产时需要通过RingBuffer的Sequencer.next获得可分配序号, 19 //Sequencer.next又依赖于最低级的Consumer(EventProcesser)的Sequence, 20 //即只有最低级的Consumer的Sequence向前移动了,producer才能继续生产。 21 sequence.set(availableSequence); 22 } 23 catch (final TimeoutException e) 24 { 25 //...... 26 } 27 }
上述实现的功能可以使用下图表示,ApplicationConsumer(ApplicationEventHandler)依赖于JournalConsumer(JournalEventHandler)和ReplicationConsumer(ReplicationEventHandler),这种依赖关系体现在ApplicationConsumer的SequenceBarrier依赖ReplicationConsumer和JournalConsumer的Sequence(三者的SequenceBarrier都依赖于RingBuffer的Sequence)。
我们看到Sequencer依赖于ApplicationConsumer的Sequence,ApplicationConsumer作为最后一级消费者,其消费速度如果过慢,就会阻塞Sequencer产生下一个序号的速度(RingBuffer.next()调用了Sequencer.next())。
Sequencer的next使用CAS分配序号,避免了锁开销,当RingBuffer满时,不断循环判断是否可以分配序号,如果不可分配则LockSupport.parkNanos(1)。
每个Consumer都有一个独立的Sequence,Consumers和Sequencer共享一个waitStrategy,Consumer在取事件时,会通过waitStrategy的waitFor获得可用序号。
waitStrategy有不同的策略,比如BlockingWaitStrategy、YieldingWaitStrategy等,这些策略决定了在无可用序号时线程的等待策略,比如BlockingWaitStrategy会阻塞(涉及锁),YieldingWaitStrategy会不断循环并yield。
disruptor中的几个重要组件:
RingBuffer:存放元素的环形容器;
Sequence:序号,类似AtomicLong,考虑了缓存行带来的伪共享(false sharing),对value进行了填充,用于并发获取RingBuffer中可存入元素的序号;
Sequencer:有两个实现类,SingleProducerSequencer和MultiProducerSequencer,这是一个非常关键的组件,next()会得到RingBuffer的下一个可分配序号,然后判断该序号是否可以被分配,判断依据就是最低级的Consumer的消费速度是否跟上了生产者的生产速度(通过最低级的Sequence与待分配序号比较即可判断);
SequenceBarrier:可以理解为Sequence的屏障,Sequence不能跨过该屏障取序号;
EventHandler:消费者的消费逻辑;
EventProccessor:消费者,循环通过SequenceBarrier.waitFor得到可用(可消费)序号,回调EventHandler的onEvent消费;
WaitStrategy:Consumer(EventProcessor)的等待策略;
Producer:用户逻辑;
参考:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
http://lmax-exchange.github.io/disruptor/
以上是关于disruptor的主要内容,如果未能解决你的问题,请参考以下文章