Disruptor使用

Posted lqblog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Disruptor使用相关的知识,希望对你有一定的参考价值。

Disruptor作者,介绍Disruptor能每秒处理600万订单。这是一个可怕的数字。

disruptor之所以那么快,是因为内部采用环形队列和无锁设计。使用cas来进行并发控制。通过获取可用下标来对事件发布和消费

下标通过cas控制(Atomic)

disruptor组成部分

          1.Disruptor:用于控制整个消费者-生产者模型的处理器 
   2.RingBuffer:用于存放数据 
   3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。 
   4.EventFactory:事件工厂类。 
   5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。 
   6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。 
   7.用于运行disruptor的线程或者线程池。

Disruptor简单使用

1.创建订单和订单事件

package com.liqang.test;

import java.math.BigDecimal;
/**
 * 简单模拟一个订单
 * @author Administrator
 *
 */
public class Order {
    private int id;
    private BigDecimal price;
    private double num;
    private int pid;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public BigDecimal getPrice() {
        return price;
    }
    public void setPrice(BigDecimal price) {
        this.price = price;
    }
    public double getNum() {
        return num;
    }
    public void setNum(double num) {
        this.num = num;
    }
    public int getPid() {
        return pid;
    }
    public void setPid(int pid) {
        this.pid = pid;
    }
}
package com.liqang.test;

import java.math.BigDecimal;
//订单事件  disruptor容器都是以事件对存在
public class OrderEvent {

    private Order order;

    public Order getOrder() {
        return order;
    }

    public void setOrder(Order order) {
        this.order = order;
    }
    
}

2.创建disruptor事件工厂

package bhz.base;

import com.lmax.disruptor.EventFactory;
// 需要让disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

disruptor会调用工厂方法为我们创建事件。并放到对应的事件槽里面

3.创建事件消费者处理类

/**
 * 事件消费者
 * @author Administrator
 *
 */
public class OrderEventHandle implements EventHandler<OrderEvent>{

    @Override
    public void onEvent(OrderEvent orderEvent, long arg1, boolean arg2) throws Exception {
        /**
         *做相应的业务处理
         */
        System.out.println(orderEvent.getOrder().getPid());
        
        
    }

}

4.创建事件生产者类

/**
 * 事件生产者
 * 
 * @author Administrator
 *
 */
public class OrderEvenProducer {
    private RingBuffer<OrderEvent> ringBuffer;// disruptor容器

    public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(Order order) {
        long index = ringBuffer.next();// 首先获取下一个事件槽位置
        try {
            OrderEvent orderEvent = ringBuffer.get(index);// 通过序列获得disruptorFacotry创建好的事件槽
            orderEvent.setOrder(order);// 填充好业务数据
        } finally {
            ringBuffer.publish(index);// 发布事件。使用finally保证publish调用
        }

    }
}

任务是 根据容器 往容器里面注册数据

/**
 * 事件生产者
 * 
 * @author Administrator
 *
 */
public class OrderEvenProducer {
    private RingBuffer<OrderEvent> ringBuffer;// disruptor容器

    public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(Order order) {
        long index = ringBuffer.next();// 首先获取下一个事件槽位置
        try {
            OrderEvent orderEvent = ringBuffer.get(index);// 通过序列获得disruptorFacotry创建好的事件槽
            orderEvent.setOrder(order);// 填充好业务数据
        } finally {
            ringBuffer.publish(index);// 发布事件。使用finally保证publish调用
        }

    }
}

5.测试

package com.liqang.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import bhz.base.LongEventHandler;

public class OrderEventMain {
    public static void main(String[] args) {
        /**
         * 创建线程池 不限制大小 60秒不被使用就会被回收
         */
        ExecutorService executorService = Executors.newCachedThreadPool();

        OrderEventFactory factory = new OrderEventFactory();// 创建事件工厂
        // 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方 diruptor减少计算事件槽的时间
        int ringBufferSize = 1024 * 1024; //
        /**
         * //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
         * WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
         * //SleepingWaitStrategy
         * 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
         * WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
         * //YieldingWaitStrategy
         * 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
         * WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
         */
        // 初始化disruptorProducerType.SINGLE 表示是单生产者
        Disruptor<OrderEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService, ProducerType.SINGLE,
                new YieldingWaitStrategy());
        // 注册消费者事件处理器
        disruptor.handleEventsWith(new OrderEventHandle());
        // 启动
        disruptor.start();
        //创建生产者
        OrderEvenProducer orderEvenProducer=new OrderEvenProducer(disruptor.getRingBuffer());
        
        //模拟生产10个订单
        for (int i = 0; i <10; i++) {
            Order order=new Order();
            order.setId(i);
            order.setNum(i);
            order.setPid(i);
            order.setPid(i);
            orderEvenProducer.onData(order);
        }
        disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
        executorService.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;        
    }
}

  disruptor3.0提供lambda表达式的方式(需要jdk8)发布事件 改造事件发布者类

package com.liqang.test;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import bhz.base.LongEvent;

/**
 * 事件生产者
 * 
 * @author Administrator
 *
 */
public class OrderEvenProducer {
    private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = 
            new EventTranslatorOneArg<OrderEvent, Order>() {
                @Override
                public void translateTo(OrderEvent event, long sequeue, Order order) {
                    event.setOrder(order);
                }
            };
    
    private final RingBuffer<OrderEvent> ringBuffer;
    
    public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void onData(Order order){
        ringBuffer.publishEvent(TRANSLATOR, order);
        
        //ringBuffer.publishEvent((event,sequeue,or)->{event.setOrder(order);},order);
    }
}

直接使用RingBuffer

package com.liqang.test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

public class RingBufferTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

     int bufferSize=1024*1024;
     //创建线程池
     ExecutorService executorService=Executors.newCachedThreadPool();
     //初始化ringbuffer
      RingBuffer<OrderEvent> ringBuffer=RingBuffer.createSingleProducer(new EventFactory<OrderEvent>() {

        @Override
        public OrderEvent newInstance() {
            // TODO Auto-generated method stub
            return new OrderEvent();
        }
    }, bufferSize, new YieldingWaitStrategy());
      //创建SequenceBarrier  
      SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
        
      //创建消息处理器  
      BatchEventProcessor<OrderEvent> transProcessor = new BatchEventProcessor<OrderEvent>(  
              ringBuffer, sequenceBarrier, new OrderEventHandle());  
        
      //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 
      ringBuffer.addGatingSequences(transProcessor.getSequence());  
        
      //把消息处理器提交到线程池  
      executorService.submit(transProcessor);  
      
      //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
        
      Future<?> future= executorService.submit(new Callable<Void>() {  
          @Override  
          public Void call() throws Exception {  
              long seq;  
              for(int i=0;i<10;i++){  
                  seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                  OrderEvent orderEvent=ringBuffer.get(seq);
                  Order order=new Order();
                  order.setId(i);
                  order.setNum(i);
                  order.setPid(i);
                  order.setPid(i);
                  orderEvent.setOrder(order);
                  ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
              }  
              return null;  
          }  
      }); 
      
      future.get();//等待生产者结束  
      Thread.sleep(1000);//等上1秒,等消费都处理完成  
      transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
      executorService.shutdown();//终止线程  
}
}

Disruptor做复杂业务操作

disruptor还可以做很多复杂的业务操作

 如 一个事件c1 c2 处理器并行执行 执行完之后交给c3

 

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
        //菱形操作
       
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
                disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 (测试遇到一个问题就是 要队列被消费完了才会走到3)
        handlerGroup.then(new Handler3());
       
         
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  
package bhz.generate2;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePublisher implements Runnable {  
    
    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  
    
    private static int LOOP=1000;//模拟百万次交易的发生  
  
    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {  for(int i=0;i<LOOP;i++){  
            disruptor.getRingBuffer().publishEvent((event,sequeue,or)->{event.setId("ff");},new Trade());
        }  
        latch.countDown();  
    }  
      
}  
  

 

顺序执行

c1执行后交给c2处理 c2 处理后交给c3处理

 package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        

        //顺序操作
       
        disruptor.handleEventsWith(new Handler1()).
            handleEventsWith(new Handler2()).
            handleEventsWith(new Handler3());
      
        
    
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

六边形操作

 

 package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
         
        
        //六边形操作. 
     
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        
        
        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

h1 h2并行执行  h1执行完毕之后h4执行 h2执行完毕之后h5执行最终交给h3汇总执行

 

以上是关于Disruptor使用的主要内容,如果未能解决你的问题,请参考以下文章

如何优雅地使用Disruptor

disruptor架构三 使用场景更加复杂的场景

disruptor的并行用法

Disruptor使用

Disruptor 使用简介

为Disruptor 写的一个简单实用的.Net扩展