1 package unit; 2 3 import java.util.PriorityQueue; 4 5 /** 6 * 非阻塞队列 7 * @author 54304 8 * 9 */ 10 public class BlockingQueue { 11 private int queueSize = 10; 12 private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); 13 14 public static void main(String[] args) { 15 BlockingQueue test = new BlockingQueue(); 16 Producer producer = test.new Producer(); 17 Consumer consumer = test.new Consumer(); 18 19 producer.start(); 20 consumer.start(); 21 } 22 23 class Consumer extends Thread{ 24 25 @Override 26 public void run() { 27 consume(); 28 } 29 30 private void consume() { 31 while(true){ 32 synchronized (queue) { 33 while(queue.size() == 0){ 34 try { 35 System.out.println("队列空,等待数据"); 36 queue.wait(); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 queue.notify(); 40 } 41 } 42 queue.poll(); //每次移走队首元素 43 queue.notify(); 44 System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); 45 } 46 } 47 } 48 } 49 50 class Producer extends Thread{ 51 52 @Override 53 public void run() { 54 produce(); 55 } 56 57 private void produce() { 58 while(true){ 59 synchronized (queue) { 60 while(queue.size() == queueSize){ 61 try { 62 System.out.println("队列满,等待有空余空间"); 63 queue.wait(); 64 } catch (InterruptedException e) { 65 e.printStackTrace(); 66 queue.notify(); 67 } 68 } 69 queue.offer(1); //每次插入一个元素 70 queue.notify(); 71 System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); 72 } 73 } 74 } 75 } 76 }