1 public final class Data { 2 3 private String id; 4 private String name; 5 6 public Data(String id, String name){ 7 this.id = id; 8 this.name = name; 9 } 10 11 public String getId() { 12 return id; 13 } 14 15 public void setId(String id) { 16 this.id = id; 17 } 18 19 public String getName() { 20 return name; 21 } 22 23 public void setName(String name) { 24 this.name = name; 25 } 26 27 @Override 28 public String toString(){ 29 return "{id: " + id + ", name: " + name + "}"; 30 } 31 32 } 33 34 35 /** 36 * 生产者 37 */ 38 public class Provider implements Runnable{ 39 40 //共享缓存区 41 private BlockingQueue<Data> queue; 42 //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态 43 private volatile boolean isRunning = true; 44 //id生成器 45 private static AtomicInteger count = new AtomicInteger(); 46 //随机对象 47 private static Random r = new Random(); 48 49 public Provider(BlockingQueue queue){ 50 this.queue = queue; 51 } 52 53 @Override 54 public void run() { 55 while(isRunning){ 56 try { 57 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 58 Thread.sleep(r.nextInt(1000)); 59 //获取的数据进行累计... 60 int id = count.incrementAndGet(); 61 //比如通过一个getData方法获取了 62 Data data = new Data(Integer.toString(id), "数据" + id); 63 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中..."); 64 /** 65 * quere.offer() 66 * 参数一:元素对象、参数二:数值、参数三:给参数二的数值定义一个时间单位 67 * 此时表示将Data对象加入queue中,2秒钟之内加入成功返回ture,反之返回false 68 */ 69 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ 70 System.out.println("提交缓冲区数据失败...."); 71 //do something... 比如重新提交 72 } 73 } catch (InterruptedException e) { 74 e.printStackTrace(); 75 } 76 } 77 } 78 79 public void stop(){ 80 this.isRunning = false; 81 } 82 83 } 84 85 86 /** 87 * 消费者 88 */ 89 public class Consumer implements Runnable{ 90 91 private BlockingQueue<Data> queue; 92 93 public Consumer(BlockingQueue queue){ 94 this.queue = queue; 95 } 96 97 //随机对象 98 private static Random r = new Random(); 99 100 @Override 101 public void run() { 102 while(true){ 103 try { 104 //获取数据 105 Data data = this.queue.take(); 106 //进行数据处理。休眠0 - 1000毫秒模拟耗时 107 Thread.sleep(r.nextInt(1000)); 108 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId()); 109 } catch (InterruptedException e) { 110 e.printStackTrace(); 111 } 112 } 113 } 114 } 115 116 117 public class Main { 118 119 public static void main(String[] args) throws Exception { 120 //内存缓冲区 LinkedBlockingQueue阻塞无界队列 121 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); 122 //生产者 123 Provider p1 = new Provider(queue); 124 125 Provider p2 = new Provider(queue); 126 Provider p3 = new Provider(queue); 127 //消费者 128 Consumer c1 = new Consumer(queue); 129 Consumer c2 = new Consumer(queue); 130 Consumer c3 = new Consumer(queue); 131 132 //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程 133 //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值) 134 ExecutorService cachePool = Executors.newCachedThreadPool(); 135 cachePool.execute(p1); 136 cachePool.execute(p2); 137 cachePool.execute(p3); 138 cachePool.execute(c1); 139 cachePool.execute(c2); 140 cachePool.execute(c3); 141 142 try { 143 Thread.sleep(3000); 144 } catch (InterruptedException e) { 145 e.printStackTrace(); 146 } 147 p1.stop(); 148 p2.stop(); 149 p3.stop(); 150 try { 151 Thread.sleep(2000); 152 } catch (InterruptedException e) { 153 e.printStackTrace(); 154 } 155 // cachePool.shutdown(); 156 // cachePool.shutdownNow(); 157 158 159 } 160 161 }