阻塞队列

Posted 亮子zl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞队列相关的知识,希望对你有一定的参考价值。

阻塞队列
    当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
    当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
    在多线程领域:所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要BlockingQueue
   好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQuere都给你手包为了
 
Iterable
   Collection
       BlockingQueue//阻塞队列  辣K印
       List
   
Collection
     Queue
     BlockingQueue
         *ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
                 方法类型    抛出异常      特殊值         阻塞          超时
                   插入         add(e)      offer(e)      put(e)        offer(e,time,unit)
                   移除         remove()    poll()        take()        poll(time,unit)
                   检查         element()   peek()        不可用        不可用
                   
         *LinkdBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列 
          PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
          DelayQueue使用优先级队列实现的延迟无界阻塞队列。 
         *SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
                  SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。
                  每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦是然。
          LinkedTransferQueue:由链表结构组成的无界阻塞队列。
          LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

生产者消费者模式:传统 

ProdConsumer_TraditionDemo
                       class ShareData
                             private int number=0;
                             private Lock lock = new ReentrantLock();
                             private Condititon condition = lock.newCondition();
                             public void increment() throws Exception
                                    lock.lock();
                                    try
                                       while(number != 0)
                                           condition.await();//等待,不能生产
                                       
                                       number++;
                                       System.out.println(Thread.currentThread()+"\\t"+number);
                                       condition.signlAll();
                                    catch(Exception e)
                                        e.printStackTrace();
                                    finally
                                        lock.unlock();
                                    
                             
                             public void decrement() throws Exception
                                    lock.lock();
                                    try
                                       while(number == 0)
                                           condition.await();//等待,不能生产
                                       
                                       number--;//干活
                                       System.out.println(Thread.currentThread()+"\\t"+number);
                                       condition.signlAll();//通知唤醒
                                    catch(Exception e)
                                        e.printStackTrace();
                                    finally
                                        lock.unlock();
                                    
                             
                       
                       public static void main(String[] args)
                             ShareData shareData=new ShareData();
                             new Thread(()->
                                  for(int i=1;i<=5;i++)
                                      try
                                         shareData.increment();
                                      catch(Eception e)
                                         e.printStackTrace();
                                      
                                  
                             ,name:"AA").start();
                             new Thread(()->
                                  for(int i=1;i<=5;i++)
                                      try
                                         shareData.decrement();
                                      catch(Eception e)
                                         e.printStackTrace();
                                      
                                  
                             ,name:"BB").start();
                       
                  阻塞队列 ProdConsumer_blockQueueDemo
                  class MyResource
                       private volatile boolean FLAG = true;
                       private AtomicInteger atomicInteger = new AtomicInteger();
                       BlockingQueue<String> blockingQueue = null;
                       public MyResource(BlockingQueue<String> blockingQueue)
                              this.blockingQueue = blockingQueue;
                              System.out.println(blockingQueue.getClass().getName());
                       
                       public void myProd() throws Exception
                              String data = null;
                              boolean retValue;
                              while(FLAG)
                                 data = atomicInteger.incrementAndGet()+"";
                                 retValue = blockingQueue.offer(data,2L,TimeUnit.SECONDS);
                                 if (retValue)
                                    System.out.println(Thread.currentThread().getName()+"\\t 插入队列"+data+"成功");
                                 else
                                    System.out.println(Thread.currentThread().getName()+"\\t 插入队列"+data+"失败");
                                 
                              
                              TimeUnit.SECONDS.sleep(timeout:1);
                       
                       System.out.println(Thread.currentThread().getName()+"\\t 叫停,表示FLAG=false,生产动作结束");
                  
                   public void myConsumer() throws Exception
                           String result = null;
                           while(FLAG)
                               result= blockingQueue.poll(2L,TimeUnit.SECONDS);
                               if(null == result || result.equalsIgnoreCase(anotherString:""))
                                   FLAG = false;
                                   System.out.println(Thread.currentThread().getName()+"\\t 超过2秒没有取到,消费退出”);
                                   System.out.println();
                                   return;
                               
                               System.out.println(Thread.currentThread().getName()+"\\t 消费队列"+result+“成功”);
                           
                   
                   public void stop()throws Exception
                        this.FLAG = false;
                   
                   public static void main(String[] args) throws Exception
                          MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
                          new Thread(()->
                               System.out.println(Thread.currentThread().getName()+"\\t 生产线程启动");
                               try
                                   myResource.myProd();
                               catch(Exception e)
                                  e.printStackTrace();
                               
                          ,name:"Prod").start();
                          new Thread(()->
                               System.out.println(Thread.currentThread().getName()+"\\t 生产线程启动");
                               try
                                   myResource.myProd();
                               catch(Exception e)
                                  e.printStackTrace();
                               
                          ,name:"Consumer").start();
                          myResource.stop();
                   

 

以上是关于阻塞队列的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程学习7-阻塞队列

使用阻塞式队列处理大数据

阻塞队列

阻塞队列

阻塞队列--概述

并发阻塞队列和非阻塞队列详解