生产者消费者模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模式相关的知识,希望对你有一定的参考价值。

参考技术A 解决并发问题。

通过一个容器解决生产者和消费者强耦合问题,否则消费者只能排队等生产者串行解决。
生产者和消费者之间不直接通信,而是通过一个阻塞队列来通信。阻塞队列就是一个缓冲区。

阻塞队列是解决生产者消费者的关键。
-- 两种自定义阻塞队列方式
-- jdk1.5版本以后java.util.concurrent提供了阻塞队列类

阻塞队列接口:

jdk1.5版本后新增的java.util.concurrent包新增了java.util.concurrent.BlockingQueue接口,并且提供了以下几种实现:

暂不考虑

测试结果:Producer执行put的时候会校验queue是不是满了,满了要等不满再put,Consumer执行take的时候会校验queue是不是空的,空了要等不空再take。
每个consumer都会在遇到#后结束,并且consumer去抢夺producer产生的data。

Java的设计模式— 生产者-消费者模式

  生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。这个模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓存区进行通信,这样就避免了生产者和消费者直接通信,从而将生产者和消费者解耦。不管是生产高于消费,还是消费高于生产,缓存区的存在可以确保系统的正常运行。这个模式有以下几种角色:

  • 生产者:用于提交用户的请求,提取用户任务,装入内存缓冲区。
  • 消费者:在内存缓冲区中提取并处理任务。
  • 内存缓冲区:缓存生产者提交的任务或数据,供消费者使用。
  • 任务:生产者向内存缓冲区提交的数据结构。
  • Main:使用消费者和生产者的客户端。

  其中BlockingQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象)。PCData表示一个生产任务,或者相关任务的数据,生产者对象和消费者对象均引用一个BlockingQueue实例。生产者负责创建PCData对象,并将它加入队列中,消费者从这个队列中获取PCData对象。下面举个例子:

  首先生产者线程实现如下,它构建PCData对象,并放入BlockingQueue队列中

public class Producer implements Runnable 

    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;                        // 内存缓存区
    private static AtomicInteger count = new AtomicInteger();   // 总数,原子操作
    private static final int SLEEPTIME = 1000;
    
    public Producer(BlockingQueue<PCData> queue) 
        this.queue = queue;
    

    @Override
    public void run() 
        PCData data = null;
        Random r = new Random();
        System.out.println("start producer id = "+Thread.currentThread().getId());
        try 
            while(isRunning)
                Thread.sleep(SLEEPTIME);
                data = new PCData(count.incrementAndGet());    // 构造任务数据
                System.out.println(data+" is put into queue");
                if(!queue.offer(data,2,TimeUnit.SECONDS))     // 提交到数据缓存区中
                    System.out.println("failed to put data:"+data);
                
            
         catch (InterruptedException e) 
            e.printStackTrace();
            Thread.currentThread().interrupt();
        

    

    public void stop()
        isRunning = false;
    

 

  对应的消费者实现如下,它从BlockingQueue队列中取出PCData对象,并进行相应的计算。

public class Consumer implements Runnable 

    private BlockingQueue<PCData> queue; 
    private static final int SLEEPTIME = 1000;
    
    public Consumer(BlockingQueue<PCData> queue) 
        this.queue = queue;
    

    @Override
    public void run() 
        System.out.println("start Consumer id = "+Thread.currentThread().getId());
        Random r = new Random();
        try 
        while(true)
            PCData data  =this.queue.take();
            if(null!=data)
                int re = data.getData() * data.getData();
                System.out.println(MessageFormat.format("0*1=2", data.getData(),data.getData(),re));
                Thread.sleep(r.nextInt(SLEEPTIME));
            
        
         catch (InterruptedException e) 
            e.printStackTrace();
            Thread.currentThread().interrupt();
        
    

  PCData对象作为生产者和消费者之间的共享数据模型,定义如下:

public class PCData 

    private final int intData;
    public PCData(int d)
        intData = d;
    
    public PCData(String d)
        intData = Integer.valueOf(d);
    
    public int getData()
        return intData;
    
    @Override
    public String toString() 
        return "intData:" + intData;
    
    

  在主函数中,创建三个生产者和三个消费者,并让他们协作运行,在主函数实现中,定义LinkedBlockingQueue作为BlockingQueue队列的实现类。

public class Main 
    public static void main(String[] args) throws InterruptedException 
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>();
        Producer p1 = new Producer(queue);
        Producer p2 = new Producer(queue);
        Producer p3 = new Producer(queue);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);
        Thread.sleep(10000);
        p1.stop();
        p2.stop();
        p3.stop();
        Thread.sleep(3000);
        service.shutdown();
    

  生产者-消费者模式很好地对生产者线程和消费者线程进行解耦,优化了系统整体结构。同时,由于缓冲作用,允许生产者和消费者线程存在执行上的性能差异,从一定程度上解决了性能瓶颈对系统性能的影响。

以上是关于生产者消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

设计模式—生产者消费者模式

实现生产者与消费者模式

Java生产者消费者模式

一、多线程下生产者消费者模式

Java的设计模式— 生产者-消费者模式

设计模式之生产者消费者模式