Disruptor极速队列

Posted gaojy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Disruptor极速队列相关的知识,希望对你有一定的参考价值。

参考:http://www.cnblogs.com/haiq/p/4112689.html

Disruptor 是线程内通信框架,用于线程里共享数据。LMAX 创建Disruptor作为可靠消息架构的一部分并将它设计成一种在不同组件中共享数据非常快的方法。

Disruptor能做什么

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);  主要基于内存屏障,如下图所示:
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

Disruptor核心概念

 

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

    • Ring Buffer
      如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
    • Sequence  Disruptor
      通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
      (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
    • Sequencer 
      Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    • Sequence Barrier
      用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
    • Wait Strategy
      定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
    • Event
      在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    • EventProcessor
      EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    • EventHandler
      Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    • Producer
      即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

Disruptor关键特性

写入ringBuffer

参考:http://ifeve.com/disruptor-writing-ringbuffer/

Ring Buffer是基于数组,而不是链表,不会产生内存回收的性能消耗。

1)Disruptor 由消费者负责通知ProducerBarrier,处理到了哪个序列号,而不是 Ring Buffer。所以,如果我们想确定我们没有让 Ring Buffer 重叠,需要检查所有的消费者们都读到了哪里。

2)现在生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。

 

提交数据及多生产者

Disruptor案例

package com.disruptor;
/**
 * 定义事件  
 * 事件event就是通过disruptor进行交换的数据类型
 * @author gaojiay
 *
 */
public class LongEvent {

  private long value;
  public void set(long value){
    this.value = value;
  }
  public long get() {
    // TODO Auto-generated method stub
    return value;
  }
}
View Code
package com.disruptor;

import com.lmax.disruptor.EventFactory;
/**
 * 事件工厂(Event Factory)定义了实例化定义的事件(Event);
 * Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例;
 * 
 * 个 Event 实例实际上被用作一个“数据槽”,发布者发布前,
 * 先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,
 * 之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event
 *  实例并从中读取数据。
 *
 */
public class LongEventFactory  implements EventFactory<LongEvent>{

  @Override
  public LongEvent newInstance() {
    // TODO Auto-generated method stub
    return new LongEvent();
  }

}
View Code
package com.disruptor;

import com.lmax.disruptor.EventHandler;

/**
 * 定义事件处理的具体实现 消费者,也就是事件处理器
 * 
 * @author gaojiay
 *
 */
public class LongEventHandler implements EventHandler<LongEvent> {

  private String name;

  @Override
  public void onEvent(LongEvent event, long sequence, boolean endofBatch) throws Exception {
    System.out.println("CoustmerEvent_"+name+":" + event.get());

  }

  public LongEventHandler() {
    super();
  }

  public LongEventHandler(String name) {
    this();
    this.name = name;

  }
}
View Code

main

package com.disruptor;
/**
 * ringbuffer的写入  http://ifeve.com/disruptor-writing-ringbuffer/
 */
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
//http://www.cnblogs.com/haiq/p/4112689.html
public class main {
 //static  AtomicLong data = new AtomicLong(1000);
  static long count = 100;
  public static void main(String[] args) {
    //Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:
    ExecutorService executor = Executors.newCachedThreadPool();
    /*BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
    SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
    YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。*/
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    
    
    //启动
    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;
            
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            
    EventHandler<LongEvent> eventHandler1 = new LongEventHandler("1");  //消费者1
    EventHandler<LongEvent> eventHandler2 = new LongEventHandler("2");  //消费者2
    disruptor.handleEventsWith(eventHandler1,eventHandler2);
            
    disruptor.start();
    // 发布事件;  

/* 发布方式一
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    for(int i= 0;i<100;i++){
    long sequence = ringBuffer.next();//请求下一个事件序号;
        
    try {
        LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
        long data = getData();//获取要通过事件传递的业务数据;
        event.set(data);
    } finally{
      //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;
               如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
      Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
        ringBuffer.publish(sequence);//发布事件;
    }
    }
    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
  */
    
    //发布方式二 :一些复杂的操作放在Ring Buffer  
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    LongEventProducerWithTranslator translator = new LongEventProducerWithTranslator(ringBuffer);
    translator.onData(getData());
  }
    

  private  static long getData(){
   // return data.getAndIncrement();
    return ++count;
  }
 
}
View Code

 

两种发布方式

package com.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer {
  private final RingBuffer<LongEvent> ringBuffer;
  public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { 
      this.ringBuffer = ringBuffer; 
  } 

  /** 
   * onData用来发布事件,每调用一次就发布一次事件事件 
   * 它的参数会通过事件传递给消费者 
   * 
   * @param bb 
   */public void onData(ByteBuffer bb) { 
          //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
          long sequence = ringBuffer.next();
      try { 
          //用上面的索引取出一个空的事件用于填充 
          LongEvent event = ringBuffer.get(sequence);// for the sequence 
          event.set(bb.getLong(0)); 
      } finally { 
          //发布事件 只ringBuffer
          ringBuffer.publish(sequence); 
      } 
  } 

}
View Code
package com.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer
 * ,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。
 * @author gaojiay
 *
 */
public class LongEventProducerWithTranslator {

  //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
  //填充Event
  private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR = 
      //Translator中方法的参数是通过RingBuffer来传递的。
      new EventTranslatorOneArg<LongEvent, Long>() { 
          public void translateTo(LongEvent event, long sequence, Long bb) { 
              event.set(bb); 
          } 
      };
      
      private final RingBuffer<LongEvent> ringBuffer;
      public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { 
          this.ringBuffer = ringBuffer; 
      } 
   
      public void onData(long bb) { 
          ringBuffer.publishEvent(TRANSLATOR, bb); 
      } 
}
View Code

测试第两种:

package com.disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class LongEventMain {

  public static void main(String[] args) throws InterruptedException {
    // Executor that will be used to construct new threads for consumers 
    Executor executor = Executors.newCachedThreadPool();
    // Specify the size of the ring buffer, must be power of 2.
    int bufferSize = 1024;// Construct the Disruptor 
    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
    // 可以使用lambda来注册一个EventHandler 
    disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.get()));
    // Start the Disruptor, starts all threads running 
    disruptor.start();
    // Get the ring buffer from the Disruptor to be used for publishing. 
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 

    LongEventProducer producer = new LongEventProducer(ringBuffer); 

    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long l = 0; true; l++) { 
        bb.putLong(0, l); 
        ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); 
        Thread.sleep(1000); 
    } 
} 
  
}
View Code

 

以上是关于Disruptor极速队列的主要内容,如果未能解决你的问题,请参考以下文章

高性能队列Disruptor使用入门,原理和代码实现

提升--20---Disruptor-----号称最快的消息队列

如何优雅地使用Disruptor

高性能并发队列Disruptor

高性能并发队列Disruptor

SpringBoot Disruptor 高性能内存消息队列