生产者消费者模式ArrayBlockingQueue

Posted da-peng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者消费者模式ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

package concurrent._interrupt;


import java.math.BigInteger;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Demo3 {


    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<BigInteger> bq = new ArrayBlockingQueue<BigInteger>(8);
        Producter producter = new Producter(bq);
        Customer customer = new Customer(bq);

        //先在里面放入两个
        bq.put(new BigInteger("10"));
        bq.put(new BigInteger("20"));
        producter.start();
        customer.start();

    }
}
class Producter extends Thread{
    private BlockingQueue<BigInteger> bq;
    public Producter(BlockingQueue<BigInteger> bq){
        super("Producter");
        this.bq = bq;
    }

    @Override
    public void run() {
        try {
                send();
        } catch (InterruptedException e) {
            System.out.println("send函数的sleep被中断");
            e.printStackTrace();
        }
    }
    private void send() throws InterruptedException {

        BigInteger bigInteger = BigInteger.ONE;
        //关于bq的add和put函数
        //add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue
        //put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
        for(;;){
            Thread.sleep(2000);
            bq.put(bigInteger=bigInteger.add(bigInteger));
        }
    }
}
class Customer extends Thread{
    private BlockingQueue<BigInteger> bq;
    public Customer(BlockingQueue<BigInteger> bq){
        super("Customer");
        this.bq = bq;
    }
    @Override
    public void run() {
        try {
            for(;;){
                get();
            }

        } catch (InterruptedException e) {
            System.out.println("get函数的take()被中断");
            e.printStackTrace();
        }
    }
    private void get() throws InterruptedException {

        Thread.sleep(1000);
        BigInteger bigInteger = bq.take();
        System.out.println(bigInteger);


    }
}

结果:

10
20
2
4
8
16
32
64

 

 

中断take()方法

技术分享图片

结果:

10
20
2
4
8
16
32
get函数的take()被中断
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
    at concurrent._interrupt.Customer.get(Demo3.java:77)
    at concurrent._interrupt.Customer.run(Demo3.java:67)

 

 

再次修改中断方法:

将get空转,等待标志位的改变

class Customer extends Thread{
    private BlockingQueue<BigInteger> bq;
    public Customer(BlockingQueue<BigInteger> bq){
        super("Customer");
        this.bq = bq;
    }
    @Override
    public void run() {
        try {
            for(;!isInterrupted();){
                get();
            }

        } catch (InterruptedException e) {
            System.out.println("get函数的take()被中断");
            e.printStackTrace();
        } finally {
            System.out.println("customer的run方法结束");

        }
    }
    private void get() throws InterruptedException {

        //BigInteger bigInteger = bq.take();
        //System.out.println(bigInteger);

    }
}

结果显示:

customer的run方法结束

 

 

再次修改代码:

class Customer extends Thread{
    boolean t = true;
    private BlockingQueue<BigInteger> bq;
    public Customer(BlockingQueue<BigInteger> bq){
        super("Customer");
        this.bq = bq;
    }
    @Override
    public void run() {
        try {
            //如果没有被中断就死循环
            for(;!isInterrupted();){
                get();
                //如果被中断了就重置
                if(isInterrupted()){
                    //将打印的东西改变一下
                    t=false;
                    //重置标志位
                    interrupted();
                }
            }

        } catch (InterruptedException e) {
            System.out.println("get函数的take()被中断");
            e.printStackTrace();
        } finally {
            System.out.println("customer的run方法结束");

        }
    }
    private void get() throws InterruptedException {

        //BigInteger bigInteger = bq.take();
        //System.out.println(bigInteger);
        //刚开始的时候打印true
        //被中断,然后再重置标志后,打印false
        System.out.println(t);
    }
}

结果:

前三秒:显示true

完了主函数请求中断消费者,消费者在死循环里面检测到中断的请求之后,将请求通过interrupted()函数,重置一下。

这时候又可以执行死循环了。

三秒结束后打印了false

 

以上是关于生产者消费者模式ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

设计模式—生产者消费者模式

实现生产者与消费者模式

Java的设计模式— 生产者-消费者模式

多线程设计模式:第三篇 - 生产者-消费者模式和读写锁模式

设计模式之生产者消费者模式

Java生产者消费者模式