Java并发多线程编程——阻塞队列(BlockingQueue)

Posted 小志的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发多线程编程——阻塞队列(BlockingQueue)相关的知识,希望对你有一定的参考价值。

一、阻塞队列(BlockingQueue)的概述

  • 阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如图所示:
  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
  • 当阻塞队列是满时,往队列中添加元素的操作将会被阻塞。
  • 同样试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他线程从队列中移除一个或者多个元素或者全清空队列后使队列重新变得空闲起来并后续新增。

二、阻塞队列(BlockingQueue)的优点

  • 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
  • 我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue都一手给你包办好了。
  • 在java.util.concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度.

三、阻塞队列(BlockingQueue)的架构介绍

四、阻塞队列(BlockingQueue)的分类

  • ArrayBlockingQueue: 由数组结构组成的有界阻塞队列。
  • LinkedBlockingDeque: 由链表结构组成的有界(但大小默认值Integer>MAX_VALUE)阻塞队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue: 使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即是单个元素的队列。
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

五、阻塞队列(BlockingQueue)的核心方法

5.1、阻塞队列的核心方法图解

5.2、阻塞队列的核心方法(抛出异常示例)

抛出异常当阻塞队列满时,再往队列里面add插入元素会抛IllegalStateException: Queue full
抛出异常当阻塞队列空时,再往队列Remove元素时候回抛出NoSuchElementException

5.2.1 、代码示例1

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
        
    
    
  • 输出结果如下图:

5.2.2 、代码示例2

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
    
            //往有界阻塞队列中添加的元素数量 > 初始容量3
            System.out.println(blockingQueue.add("d"));
        
    
    
  • 输出结果如下图:

5.2.3 、代码示例3

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
    
            //从有界阻塞队列中移除的元素数量 = 初始容量3
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
        
    
    
  • 输出结果如下图:

5.2.4 、代码示例4

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
    
            //从有界阻塞队列中移除的元素数量 > 初始容量3
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
            System.out.println(blockingQueue.remove());
    
            System.out.println(blockingQueue.remove());
        
    
    
  • 输出结果如下图:

5.2.5 、代码示例5

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.add("a"));
            System.out.println(blockingQueue.add("b"));
            System.out.println(blockingQueue.add("c"));
    
            //检查队列是否为空 或者队列中队首的元素时哪个
            System.out.println("检查队列是否为空 或者队列中队首的元素时哪个===="+blockingQueue.element());	
        
    
    
  • 输出结果如下图:

5.3、阻塞队列的核心方法(特殊值示例)

特殊值插入方法,成功返回true 失败返回false
特殊值插入方法,成功返回true 失败返回false

5.3.1 、代码示例1

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 > 初始容量3
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
            System.out.println(blockingQueue.offer("d"));
        
    
    
  • 输出结果如下图:

5.3.2 、代码示例2

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
    
            //从有界阻塞队列中移除的元素数量 > 初始容量3
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
    
        
    
    
  • 输出结果如下图:

5.3.3 、代码示例3

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            System.out.println(blockingQueue.offer("a"));
            System.out.println(blockingQueue.offer("b"));
            System.out.println(blockingQueue.offer("c"));
    
            //检查队列是否为空 或者队列中队首的元素时哪个
            System.out.println(blockingQueue.peek());
    
    
        
    
    
  • 输出结果如下图:

5.4、阻塞队列的核心方法(一直阻塞示例)

一直阻塞当阻塞队列满时,生产者继续往队列里面put元素,队列会一直阻塞直到put数据or响应中断退出
一直阻塞当阻塞队列空时,消费者试图从队列take元素,队列会一直阻塞消费者线程直到队列可用.

5.4.1 、代码示例1

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) throws InterruptedException 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            //因为无返回值,所有没有打印输出
            blockingQueue.put("a");
            blockingQueue.put("b");
            blockingQueue.put("c");
    
            System.out.println("================");
            //往有界阻塞队列中添加的元素数量 > 初始容量3 ,没有报错,一直阻塞
            blockingQueue.put("d");
        
    
    
  • 输出结果如下图:

5.4.2 、代码示例2

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    public class BlockingQueueDemo1 
        public static void main(String[] args) throws InterruptedException 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3
            //因为无返回值,所有没有打印输出
            blockingQueue.put("a");
            blockingQueue.put("b");
            blockingQueue.put("c");
    
            System.out.println("================");
            //从有界阻塞队列中移除的元素数量 > 初始容量3 ,没有报错,一直阻塞
            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();
    
        
    
    
  • 输出结果如下图:

5.5、阻塞队列的核心方法(超时退出示例)

超时退出当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程就会退出

5.5.1 、代码示例1

  • 代码如下:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    public class BlockingQueueDemo1 
        public static void main(String[] args) throws InterruptedException 
            //创建一个由数组结构组成的有界阻塞队列,初始容量为3
            BlockingQueue<String> blockingQueue =new ArrayBlockingQueue<>(3);
            //往有界阻塞队列中添加的元素数量 = 初始容量3,直接添加成功
            System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
            //往有界阻塞队列中添加的元素数量 > 初始容量3,等待2秒钟后,添加成功
            System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
    
    
    
        
    
    
  • 输出结果如下图:

5.5.2 、代码示例2