生产者消费者模型

Posted 有悟还有迷

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模型相关的知识,希望对你有一定的参考价值。

  1 public final class Data {
  2 
  3     private String id;
  4     private String name;
  5     
  6     public Data(String id, String name){
  7         this.id = id;
  8         this.name = name;
  9     }
 10     
 11     public String getId() {
 12         return id;
 13     }
 14 
 15     public void setId(String id) {
 16         this.id = id;
 17     }
 18 
 19     public String getName() {
 20         return name;
 21     }
 22 
 23     public void setName(String name) {
 24         this.name = name;
 25     }
 26 
 27     @Override
 28     public String toString(){
 29         return "{id: " + id + ", name: " + name + "}";
 30     }
 31     
 32 }
 33 
 34 
 35 /**
 36  * 生产者
 37  */
 38 public class Provider implements Runnable{
 39     
 40     //共享缓存区
 41     private BlockingQueue<Data> queue;
 42     //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
 43     private volatile boolean isRunning = true;
 44     //id生成器
 45     private static AtomicInteger count = new AtomicInteger();
 46     //随机对象
 47     private static Random r = new Random(); 
 48     
 49     public Provider(BlockingQueue queue){
 50         this.queue = queue;
 51     }
 52 
 53     @Override
 54     public void run() {
 55         while(isRunning){
 56             try {
 57                 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
 58                 Thread.sleep(r.nextInt(1000));
 59                 //获取的数据进行累计...
 60                 int id = count.incrementAndGet();
 61                 //比如通过一个getData方法获取了
 62                 Data data = new Data(Integer.toString(id), "数据" + id);
 63                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
 64                 /**
 65                  * quere.offer()
 66                  * 参数一:元素对象、参数二:数值、参数三:给参数二的数值定义一个时间单位
 67                  * 此时表示将Data对象加入queue中,2秒钟之内加入成功返回ture,反之返回false
 68                  */
 69                 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
 70                     System.out.println("提交缓冲区数据失败....");
 71                     //do something... 比如重新提交
 72                 }
 73             } catch (InterruptedException e) {
 74                 e.printStackTrace();
 75             }
 76         }
 77     }
 78     
 79     public void stop(){
 80         this.isRunning = false;
 81     }
 82     
 83 }
 84 
 85 
 86 /**
 87  * 消费者
 88  */
 89 public class Consumer implements Runnable{
 90 
 91     private BlockingQueue<Data> queue;
 92 
 93     public Consumer(BlockingQueue queue){
 94         this.queue = queue;
 95     }
 96 
 97     //随机对象
 98     private static Random r = new Random();
 99 
100     @Override
101     public void run() {
102         while(true){
103             try {
104                 //获取数据
105                 Data data = this.queue.take();
106                 //进行数据处理。休眠0 - 1000毫秒模拟耗时
107                 Thread.sleep(r.nextInt(1000));
108                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
109             } catch (InterruptedException e) {
110                 e.printStackTrace();
111             }
112         }
113     }
114 }
115 
116 
117 public class Main {
118 
119     public static void main(String[] args) throws Exception {
120         //内存缓冲区 LinkedBlockingQueue阻塞无界队列
121         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
122         //生产者
123         Provider p1 = new Provider(queue);
124         
125         Provider p2 = new Provider(queue);
126         Provider p3 = new Provider(queue);
127         //消费者
128         Consumer c1 = new Consumer(queue);
129         Consumer c2 = new Consumer(queue);
130         Consumer c3 = new Consumer(queue);
131 
132         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程
133         //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
134         ExecutorService cachePool = Executors.newCachedThreadPool();
135         cachePool.execute(p1);
136         cachePool.execute(p2);
137         cachePool.execute(p3);
138         cachePool.execute(c1);
139         cachePool.execute(c2);
140         cachePool.execute(c3);
141 
142         try {
143             Thread.sleep(3000);
144         } catch (InterruptedException e) {
145             e.printStackTrace();
146         }
147         p1.stop();
148         p2.stop();
149         p3.stop();
150         try {
151             Thread.sleep(2000);
152         } catch (InterruptedException e) {
153             e.printStackTrace();
154         }        
155 //        cachePool.shutdown(); 
156 //        cachePool.shutdownNow();
157         
158 
159     }
160     
161 }

 

以上是关于生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

golang生产者消费者模型示例代码

LINUX多线程(生产者消费者模型,POXIS信号量)

转: Java并发编程之十三:生产者—消费者模型(含代码)

Java生产消费者模型——代码解析

生产者消费者模型

生产者消费者模型-Java代码实现