disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者
Posted luzhouxiaoshuai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者相关的知识,希望对你有一定的参考价值。
在helloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法去获取RingBuffer,其实在很多时候,我们可以直接使用RingBuffer,以及其他的API操作。我们一起熟悉下示例:
使用EventProcessor消息处理器。
BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event
EventProcessor有3个实现类
BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event
使用BatchEventProcessor 消费者需要实现EventHandler接口
我们来看下面的代码:
需要处理的实体类
package bhz.generate1; import java.util.concurrent.atomic.AtomicInteger; public class Trade { private String id;//ID private String name; private double price;//金额 private AtomicInteger count = new AtomicInteger(0); public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }
消费者类:
package bhz.generate1; import java.util.UUID; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //杩欓噷鍋氬叿浣撶殑娑堣垂閫昏緫 event.setId(UUID.randomUUID().toString());//绠?崟鐢熸垚涓婭D System.out.println(event.getId()); } }
消费者除了实现EventHandler接口之外,还实现了WorkHandler接口,为啥了,因为后面我们要使用了WokerPool来发送该实体类,所以这里就让该实体类实现两个接口
我们来看看main方法
package bhz.generate1; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer创建一个单生产者的RingBuffer, * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //创建SequenceBarrier ,用于平衡生产者和消费者速率,用障碍来处理 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //创建消息处理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把消息处理器提交到线程池 executors.submit(transProcessor); //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块 ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 } return null; } }); future.get();//等待生产者结束 Thread.sleep(1000);//等上1秒,等消费都处理完成 transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) executors.shutdown();//终止线程 } }
//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());
它主要有三个成员RingBuffer、SequenceBarrier和EventHandler
上面对应对应的是一个生产者,一个消费者的情况
我们来看看程序运行的效果
1a7226d0-e212-4183-b109-cab5e5c41545
3e1da0fa-686d-4361-bea2-600c2c5d26b9
bf31874a-3405-4008-80e7-03caf9f16ae4
080a05ef-0052-4271-a2ee-ee50038a5a77
71e1a5a8-24ba-4175-b53a-f8b71e99464a
99670de9-6aa5-48fa-8fa2-a490250e25ba
7a44b351-0caa-4ac3-b344-97cf72c9dd5f
10a7fe52-eef1-453c-80a2-126fd8bac948
c78f2ed5-3c3e-4481-9062-dd96ff7ba051
49f51ad6-2ee5-4c36-a0d0-96bc0e17fba9
如果是一个生产者,对应多个消费者,那么
//创建消息处理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());
//这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略 ,将生产者和消费者关联起来
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类
所以:BatchEventProcessor 多线程并发执行,不同线程执行不同是不同的event
2、使用WorkerPool消息处理器。
消费者需要实现:WorkHandler接口
我们来看看主程序的代码:
package bhz.generate1; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); workerPool.start(executor); //下面这个生产8个数据 for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } }
程序运行的效果:
4bbffa55-b19f-44a4-bfa7-100affc63323
121a0ee8-7e8e-4637-b659-ca78ae9aaa20
0fc1cdb8-8186-44fc-a3a5-4bf5fea66086
afb70a80-e1ce-46f9-bfc1-4e0d81be96b4
0e0b3690-830b-4d38-b78b-e0930b499515
f5b4e23f-10c8-45ea-b064-32ae40f54912
4a172494-480a-4509-99d0-d416b5e2c5c9
902c0669-6196-423e-9924-31cb9633bbb5
以上是关于disruptor架构三 使用场景 使用WorkHandler和BatchEventProcessor辅助创建消费者的主要内容,如果未能解决你的问题,请参考以下文章
Disruptor底层实现讲解与RingBuffer数据结构讲解
Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#