[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例

Posted TonyYPZhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例相关的知识,希望对你有一定的参考价值。

在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。

 

Toast 被生产和消费的对象。

ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。

Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。

Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。

Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。

ThreadHelper 工具类,用于输出线程相关信息。

ProducerConsumerDemo 演示这个场景

 

 

代码实现:

Toast 实现

public class Toast {
    
    private int id;
    
    public Toast(int id){
        this.id = id;
    }

    public String toString(){
        return " toast#" + id;
    }
}

ToastQueue 实现

import java.util.concurrent.LinkedBlockingQueue;

public class ToastQueue extends LinkedBlockingQueue<Toast> {
    private static final long serialVersionUID = 1L;
}

Producer 循环生产 Toast

import java.util.concurrent.TimeUnit;

public class Producer implements Runnable {

    private ToastQueue toastQueue;
    private int count;
    
    public Producer(ToastQueue toastQueue){
        this.toastQueue = toastQueue;
        this.count = 0;
    }
    
    @Override
    public void run() {
        try {
            while (true){
                TimeUnit.MILLISECONDS.sleep(100);
                
                Toast toast = new Toast(count);
                count++;
                toastQueue.put(toast);
                ThreadHelper.print(" produced " + toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。

import java.util.concurrent.TimeUnit;

public class Processor implements Runnable {

    private ToastQueue initialToastQ;
    private ToastQueue finishedToastQ;
    
    
    public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){
        this.initialToastQ = initialToastQ;
        this.finishedToastQ = finishedToastQ;
    }
    
    @Override
    public void run() {
        try {
            while (true){
                Toast toast = initialToastQ.take();
                
                ThreadHelper.print(" processing " + toast);

                TimeUnit.MILLISECONDS.sleep(180);
                
                finishedToastQ.put(toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer 消耗 Toast

public class Consumer implements Runnable {

    private ToastQueue finishedToastQ;

    public Consumer(ToastQueue finishedToastQ){
        this.finishedToastQ = finishedToastQ;
    }
    
    @Override
    public void run() {
        try {
            while (true){
                Toast toast = finishedToastQ.take();
                ThreadHelper.print(" consumed " + toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadHelper 线程帮助类

public class ThreadHelper {    
    public static void print(String msg){
        System.out.println("[" + Thread.currentThread().getName() + " ] " + msg);
    }
}

演示烤面包的生产、加工、消费的场景

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerDemo {

    public static void main() throws InterruptedException{
        
        ToastQueue initialToastQ = new ToastQueue();
        ToastQueue finishedToastQ = new ToastQueue();
        
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(initialToastQ));
        exec.execute(new Processor(initialToastQ, finishedToastQ));
        exec.execute(new Consumer(finishedToastQ));

        TimeUnit.SECONDS.sleep(2);
        exec.shutdownNow();
    }
}

 

 

输出结果:

[pool-1-thread-2 ]  processing  toast#0
[pool-1-thread-1 ]  produced  toast#0
[pool-1-thread-1 ]  produced  toast#1
[pool-1-thread-2 ]  processing  toast#1
[pool-1-thread-3 ]  consumed  toast#0
[pool-1-thread-1 ]  produced  toast#2
[pool-1-thread-1 ]  produced  toast#3
[pool-1-thread-2 ]  processing  toast#2
[pool-1-thread-3 ]  consumed  toast#1
[pool-1-thread-1 ]  produced  toast#4
[pool-1-thread-1 ]  produced  toast#5
[pool-1-thread-2 ]  processing  toast#3
[pool-1-thread-3 ]  consumed  toast#2
[pool-1-thread-1 ]  produced  toast#6
[pool-1-thread-1 ]  produced  toast#7
[pool-1-thread-2 ]  processing  toast#4
[pool-1-thread-3 ]  consumed  toast#3
[pool-1-thread-1 ]  produced  toast#8
[pool-1-thread-2 ]  processing  toast#5
[pool-1-thread-3 ]  consumed  toast#4
[pool-1-thread-1 ]  produced  toast#9
[pool-1-thread-1 ]  produced  toast#10
[pool-1-thread-2 ]  processing  toast#6
[pool-1-thread-3 ]  consumed  toast#5
[pool-1-thread-1 ]  produced  toast#11
[pool-1-thread-1 ]  produced  toast#12
[pool-1-thread-2 ]  processing  toast#7
[pool-1-thread-3 ]  consumed  toast#6
[pool-1-thread-1 ]  produced  toast#13
[pool-1-thread-1 ]  produced  toast#14
[pool-1-thread-2 ]  processing  toast#8
[pool-1-thread-3 ]  consumed  toast#7
[pool-1-thread-1 ]  produced  toast#15
[pool-1-thread-1 ]  produced  toast#16
[pool-1-thread-2 ]  processing  toast#9
[pool-1-thread-3 ]  consumed  toast#8
[pool-1-thread-1 ]  produced  toast#17
[pool-1-thread-2 ]  processing  toast#10
[pool-1-thread-3 ]  consumed  toast#9
[pool-1-thread-1 ]  produced  toast#18
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at concurrencyProducerConsumer.Consumer.run(Consumer.java:15)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at concurrencyProducerConsumer.Producer.run(Producer.java:19)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at concurrencyProducerConsumer.Processor.run(Processor.java:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

 

参考资料

Page 868, Produer-consumers and queue, Thinking in Java

 

以上是关于[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例的主要内容,如果未能解决你的问题,请参考以下文章

多进程 multiprocessing 多线程Threading 线程池和进程池concurrent.futures

JAVA 多线程报错 java.util.concurrent.RejectedExecutionException

JAVA 多线程报错 java.util.concurrent.RejectedExecutionException

廖雪峰Java11多线程编程-3高级concurrent包-1ReentrantLock

Java多线程工具包java.util.concurrent---ExecutorService

java——多线程管理(concurrent包)