disruptor架构三 使用场景更加复杂的场景

Posted luzhouxiaoshuai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor架构三 使用场景更加复杂的场景相关的知识,希望对你有一定的参考价值。

先c1和c2并行消费生产者产生的数据,然后c3再消费该数据

 我们来使用代码实现:我们可以使用Disruptor实例来实现,也可以不用产生Disruptor实例,直接调用RingBuffer的api来实现,不清楚看上一节使用的api函数workpool和BatchEventProcess来辅助实现消费者

上面需要使用的场景很复杂,只能使用Disruptor实例来实现线程通信,简单场景就直接使用RingBuffer就可以了

我们来看下程序的代码:

案例一:

package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        System.out.println("handler1: set name");
        event.setName("h1");
        Thread.sleep(1000);
    }  
}  
package bhz.generate2;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler2: set price");
        event.setPrice(17.0);
        Thread.sleep(1000);
    }  
      
}  
package bhz.generate2;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.toString());
    }  
}

 

我们来看下主函数

package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
        //菱形操作
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
                disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 
        handlerGroup.then(new Handler3());
        
        //顺序操作
        /**
        disruptor.handleEventsWith(new Handler1()).
            handleEventsWith(new Handler2()).
            handleEventsWith(new Handler3());
        */
        
        //六边形操作. 
        /**
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        */
        
        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

生产者的代码:
package bhz.generate2;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePublisher implements Runnable {  
    
    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  
    
    private static int LOOP=10;//模拟百万次交易的发生  
  
    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {  
        TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
        for(int i=0;i<LOOP;i++){  
            disruptor.publishEvent(tradeTransloator);  
        }  
        latch.countDown();  
    }  
      
}  
  
class TradeEventTranslator implements EventTranslator<Trade>{  
    
    private Random random=new Random();  
    
    @Override  
    public void translateTo(Trade event, long sequence) {  
        this.generateTrade(event);  
    }  
    
    private Trade generateTrade(Trade trade){  
        trade.setPrice(random.nextDouble()*9999);  
        return trade;  
    }  
    
}  

hanlde1设置name,handle2设置价格,hanle3将名字和价格打印出来

我们来看下程序运行的代码:
 

handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2aa2f9e6
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7d6c848f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5f73089d
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@793aac5f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7b0acf26
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2a606e6
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@620ee765
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@4079ca2e
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7bc8b313
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@3564e3e2
总耗时:10027

上面对应一个生产者,三个消费者,一个生产者提交了10个任务,上面每一个hashcode都是不同的

上面这种模式是一个生产者,多个消费者的情况

 场景2:

 

 


 我们来看下程序的代码:

package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        System.out.println("handler4: get name : " + event.getName());
        event.setName(event.getName() + "h4");
    }  
}  

 

package bhz.generate2;

import java.util.UUID;

import bhz.generate1.Trade;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        System.out.println("handler5: get price : " + event.getPrice());
        event.setPrice(event.getPrice() + 3.0);
    }  
}  

 

主函数:

package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
  /*      //菱形操作
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
                disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 
        handlerGroup.then(new Handler3());*/
        
        //顺序操作
        /**
        disruptor.handleEventsWith(new Handler1()).
            handleEventsWith(new Handler2()).
            handleEventsWith(new Handler3());
        */
        
        //六边形操作. 
     
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        
        
        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

 

程序运行的效果:

handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler2: set price
handler1: set name
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2f30e44c
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@32dc51c8
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@1e7d3b8e
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2fd0f745
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@4019eb89
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@65b6903f
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@34b84c44
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2f971f66
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@6a8e79f2
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2fe83585
总耗时:10368

3、案例三:如何实现按顺利操作了,c1执行完了执行c2,c2执行完了执行c3

package bhz.generate2;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bhz.generate1.Trade;
import bhz.generate1.TradeHandler;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
  /*      //菱形操作
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
                disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 
        handlerGroup.then(new Handler3());*/
        
        //顺序操作
        disruptor.handleEventsWith(new Handler1()).
            handleEventsWith(new Handler2()).
            handleEventsWith(new Handler3());
        
        //六边形操作. 
    /* 
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);*/
        
        
        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  

执行执行的效果如下所示:

handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@51532e9f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2c19f26f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5b7b6c07
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@3f16e7d
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5f8cd290
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@16541cf2
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@6c385f51
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@68f9f658
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@729c413f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@707fc9e1
总耗时:20049

以上是关于disruptor架构三 使用场景更加复杂的场景的主要内容,如果未能解决你的问题,请参考以下文章

Disruptor并发框架 核心概念场景分析

disruptor笔记之六:常见场景

《重学Java高并发》disruptor在数据同步场景下的应用实战(技术方案设计实战)

《重学Java高并发》disruptor在数据同步场景下的应用实战(技术方案设计实战)

《重学Java高并发》disruptor在数据同步场景下的应用实战(技术方案设计实战)

disruptor 史上最全之3: 8大使用场景详细图解