disruptor 单生产者多消费者

Posted zhaopengcheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor 单生产者多消费者相关的知识,希望对你有一定的参考价值。

demo1 单生产者多消费者创建。

maven 依赖

<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

 

1 对象 - Message

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message2 
    private String id;
    private String name;
    private double price;

2 在主函数中创建 disruptor

Disruptor<Message2> disruptor = new Disruptor<>(
                new EventFactory<Message2>() 
                    @Override
                    public Message2 newInstance() 
                        return new Message2();
                    
                ,
                1 << 10,
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new BusySpinWaitStrategy()
        );

 

3 disruptor 绑定消费者

// disruptor 绑定消费者
disruptor.handleEventsWith(new MessageHandler1());


//创建消费者
@Slf4j
public class MessageHandler1 implements EventHandler<Message2> 
    @Override
    public void onEvent(Message2 event, long sequence, boolean endOfBatch) throws Exception 
        event.setId(UUID.randomUUID().toString());
        log.info("【handler1,set id】 id: , name: , price: ", event.getId(), event.getName(), event.getPrice());
    

 

4 启动 disruptor

RingBuffer<Message2> ringBuffer = disruptor.start();

 

5 disruptor 绑定生产者

//绑定生产者
CountDownLatch latch = new CountDownLatch(1);
ExecutorService es = Executors.newFixedThreadPool(4);
es.submit(new MessagePublish2(disruptor, latch));

// 生产者类
public class MessagePublish2 implements Runnable 
    private Disruptor<Message2> disruptor;
    private CountDownLatch latch;

    public MessagePublish2(Disruptor<Message2> disruptor, CountDownLatch latch) 
        this.disruptor = disruptor;
        this.latch = latch;
    

    @Override
    public void run() 
        for (int i = 0; i < 3; i++) 
            disruptor.publishEvent(new MessageEventTranslator());
        
        latch.countDown();
    

 

6 阻塞等待 & 关闭服务

        // 阻塞等待
        latch.await();

        // 关闭服务
        es.shutdown();
        disruptor.shutdown();

 

以上是关于disruptor 单生产者多消费者的主要内容,如果未能解决你的问题,请参考以下文章

Disruptor 高性能并发框架二次封装

disruptor

为Disruptor 写的一个简单实用的.Net扩展

Disruptor多个消费者不重复处理生产者发送过来的消息

优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析

Disruptor之粗糙认识