Java并发之:生产者消费者问题

Posted Zhao Gang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发之:生产者消费者问题相关的知识,希望对你有一定的参考价值。

生产者消费者问题是Java并发中的常见问题之一,在实现时,一般可以考虑使用juc包下的BlockingQueue接口,至于具体使用哪个类,则就需要根据具体的使用场景具体分析了。本文主要实现一个生产者消费者的原型,以及实现一个生产者消费者的典型使用场景。

第一个问题:实现一个生产者消费者的原型。

 1 import java.util.concurrent.*;
 2 
 3 class Consumer implements Runnable {
 4     BlockingQueue q = null;
 5 
 6     public Consumer(BlockingQueue q) {
 7         this.q = q;
 8     }
 9 
10     @Override 
11     public void run() {
12         while(true) {
13             try {
14                 q.take();
15                 System.out.println("Consumer has taken a product.");
16             }catch(InterruptedException e) {
17     
18             }
19         }
20     }
21 }
22 
23 class Producer implements Runnable {
24     BlockingQueue q = null;
25     
26     public Producer(BlockingQueue q) {
27         this.q = q;
28     }
29 
30     @Override
31     public void run() {
32         while(true) {
33             try { // note that if there is any chance that block, usually we need a InterruptedException
34                 q.put(new Object());
35                 System.out.println("Producer has puted a product.");
36             }catch(InterruptedException e) {
37 
38             }
39         }
40     }
41 
42 
43 }
44 
45 public class JC_ProducerConsumerPrototype {
46     static int queueCapacity = 1024;
47     //static BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(queueCapacity); // Can also compile
48     static BlockingQueue q = new ArrayBlockingQueue(queueCapacity); // ABQ must has a capacity
49     public static void main(String[] args) {
50         Thread t1 = new Thread(new Producer(q));
51         Thread t2 = new Thread(new Consumer(q));
52         t1.start();
53         t2.start();
54     }
55 
56 
57 }

 

第二个问题,现在假设生产者是在读取磁盘上的多个log文件,对于每一个文件,依次读取文件中的每一行,也就是一条log记录;消费者需要读取并分析这些记录,假设消费者是计算密集型的。如何在生产者消费者原型的基础上实现这些功能?

这个场景在server端开发中是经常碰到的,因为在Server端,不可避免地会产生大量的日志文件。

 1 import java.util.concurrent.*;
 2 import java.io.*;
 3 import java.nio.*;
 4 import java.nio.file.*;
 5 import java.util.*;
 6 import java.nio.charset.*;
 7 
 8 
 9 class Producer implements Runnable {
10     BlockingQueue q = null;
11     String fileName = null;
12     CountDownLatch latch = null;
13     
14     public Producer(BlockingQueue q,String fileName,CountDownLatch latch) {
15         this.q = q;
16         this.fileName = fileName;
17         this.latch = latch;
18     }
19 
20     @Override
21     public void run() {
22         Path path = Paths.get(".",fileName);
23         try{
24             List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
25             for(int i=lines.size();i>0;i--){
26                 try{
27                     q.put(lines.get(i));
28                 }catch(InterruptedException e) {
29 
30                 }
31             }
32         }catch(IOException e){
33 
34         }
35         latch.countDown();
36     }
37 }
38 
39 class Consumer implements Runnable {
40     BlockingQueue<String> q = null;
41     Boolean done = false;
42     
43     public Consumer(BlockingQueue q,Boolean done){
44         this.q = q;
45         this.done = done;
46     }
47 
48     @Override
49     public void run(){
50         while(!done||q.size()!=0){
51             try{
52                 q.take();
53             }catch(InterruptedException e){
54 
55             }
56         }
57     }
58 }
59 
60 public class JC_ProducerConsumerHandlingLog{
61     public static int fileCount = 1024;
62     public static String[] fileNames = new String[fileCount];
63     public static int cpuCount = 8;
64     public static CountDownLatch latch = new CountDownLatch(fileCount);
65     public static volatile boolean done = false;
66     public static BlockingQueue<String> q = new LinkedBlockingQueue<String>(fileCount);//one thread for one file
67 
68     public static void main(String[] args){
69         for(int i=0;i<fileCount;i++){
70             Thread t = new Thread(new Producer(q,fileNames[i],latch));
71             t.start();
72         }
73         for(int i=0;i<cpuCount;i++){//for computing tasks, we don‘t need too many threads.
74             Thread t = new Thread(new Consumer(q,done));
75             t.start();
76         }
77         try{
78             latch.await();
79             done = true;
80         }catch(InterruptedException e){
81 
82         }
83 
84     }
85 }

 

 需要稍微注意一下线程数的选择,对于计算密集型的任务,我认为线程数达到cpu的核数比较合理(在不考虑超线程的情况下,也就是说一个核只有一个线程)。有不同意见欢迎跟我交流!

以上是关于Java并发之:生产者消费者问题的主要内容,如果未能解决你的问题,请参考以下文章

java并发之生产者消费者模型

java并发:多生产者一消费者

Java多线程之并发协作生产者消费者设计模式

Java多线程之并发协作生产者消费者设计模式

Java并发程序设计设计模式与并发之生产者-消费者模式

java 多线程并发系列之 生产者消费者模式的两种实现