生产者消费者模式ArrayBlockingQueue
Posted da-peng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模式ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。
package concurrent._interrupt; import java.math.BigInteger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Demo3 { public static void main(String[] args) throws InterruptedException { BlockingQueue<BigInteger> bq = new ArrayBlockingQueue<BigInteger>(8); Producter producter = new Producter(bq); Customer customer = new Customer(bq); //先在里面放入两个 bq.put(new BigInteger("10")); bq.put(new BigInteger("20")); producter.start(); customer.start(); } } class Producter extends Thread{ private BlockingQueue<BigInteger> bq; public Producter(BlockingQueue<BigInteger> bq){ super("Producter"); this.bq = bq; } @Override public void run() { try { send(); } catch (InterruptedException e) { System.out.println("send函数的sleep被中断"); e.printStackTrace(); } } private void send() throws InterruptedException { BigInteger bigInteger = BigInteger.ONE; //关于bq的add和put函数 //add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue //put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 for(;;){ Thread.sleep(2000); bq.put(bigInteger=bigInteger.add(bigInteger)); } } } class Customer extends Thread{ private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { for(;;){ get(); } } catch (InterruptedException e) { System.out.println("get函数的take()被中断"); e.printStackTrace(); } } private void get() throws InterruptedException { Thread.sleep(1000); BigInteger bigInteger = bq.take(); System.out.println(bigInteger); } }
结果:
10 20 2 4 8 16 32 64
中断take()方法
结果:
10 20 2 4 8 16 32 get函数的take()被中断 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) at concurrent._interrupt.Customer.get(Demo3.java:77) at concurrent._interrupt.Customer.run(Demo3.java:67)
再次修改中断方法:
将get空转,等待标志位的改变
class Customer extends Thread{ private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { for(;!isInterrupted();){ get(); } } catch (InterruptedException e) { System.out.println("get函数的take()被中断"); e.printStackTrace(); } finally { System.out.println("customer的run方法结束"); } } private void get() throws InterruptedException { //BigInteger bigInteger = bq.take(); //System.out.println(bigInteger); } }
结果显示:
customer的run方法结束
再次修改代码:
class Customer extends Thread{ boolean t = true; private BlockingQueue<BigInteger> bq; public Customer(BlockingQueue<BigInteger> bq){ super("Customer"); this.bq = bq; } @Override public void run() { try { //如果没有被中断就死循环 for(;!isInterrupted();){ get(); //如果被中断了就重置 if(isInterrupted()){ //将打印的东西改变一下 t=false; //重置标志位 interrupted(); } } } catch (InterruptedException e) { System.out.println("get函数的take()被中断"); e.printStackTrace(); } finally { System.out.println("customer的run方法结束"); } } private void get() throws InterruptedException { //BigInteger bigInteger = bq.take(); //System.out.println(bigInteger); //刚开始的时候打印true //被中断,然后再重置标志后,打印false System.out.println(t); } }
结果:
前三秒:显示true
完了主函数请求中断消费者,消费者在死循环里面检测到中断的请求之后,将请求通过interrupted()函数,重置一下。
这时候又可以执行死循环了。
三秒结束后打印了false
以上是关于生产者消费者模式ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章