disruptor的并行用法

Posted soft.push("zzq")

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor的并行用法相关的知识,希望对你有一定的参考价值。

实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:

public class DTaskFactory implements EventFactory<DTask> {
    @Override
    public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象
        return new DTask();
    }
}

生产消费的对象类型:

public class DTask {
    public String getName1() {
        return name1;
    }

    public void setName1(String name1) {
        this.name1 = name1;
    }

    public String getName2() {
        return name2;
    }

    public void setName2(String name2) {
        this.name2 = name2;
    }

    public String getName3() {
        return name3;
    }

    public void setName3(String name3) {
        this.name3 = name3;
    }

    String name1;
    String name2;
    String name3;

}

disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):

public class DTaskHandle implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("开始最后消费");
        System.out.println(dTask.getName1());

        System.out.println(dTask.getName2());
        System.out.println(dTask.getName3());
        System.out.println("结束最后消费");
    }
}

public class DTaskHandle1 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle1-----");
        dTask.setName1("name1");
    }
}

public class DTaskHandle2 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle2-----");
        dTask.setName2("name2");
    }
}

public class DTaskHandle3 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle3-----");
        dTask.setName3("name3");
    }
}

测试执行类:

public class DisruptorTest {

    public void exec() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(),
                1024 * 1024,
                executor,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        DTaskHandle dTaskHandle = new DTaskHandle();
        DTaskHandle1 dTaskHandle1 = new DTaskHandle1();
        DTaskHandle2 dTaskHandle2 = new DTaskHandle2();
        DTaskHandle3 dTaskHandle3 = new DTaskHandle3();
        disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行

        disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle

//        disruptor.

        disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);
        //生产者准备
        executor.submit(new TradePublisher(latch, disruptor));

        latch.await();//等待生产者完事.

        disruptor.shutdown();
        executor.shutdown();
    }

}

 

以上是关于disruptor的并行用法的主要内容,如果未能解决你的问题,请参考以下文章

Disruptor - 消费者是多线程的吗?

Disruptor极速队列

如何在 python 中并行化以下代码片段?

Disruptor 高性能并发框架二次封装

LMAX Disruptor - 维护事件的顺序

[工作积累] UE4 并行渲染的同步 - Sync between FParallelCommandListSet & FRHICommandListImmediate calls(代码片段