阻塞队列之二:LinkedBlockingQueue

Posted wait-pigblog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞队列之二:LinkedBlockingQueue相关的知识,希望对你有一定的参考价值。

一、简介

   已经了解过了BlockingQueue阻塞队列,这一篇就直接开始入主题LinkedBlockingQueue,这是一个链表实现的有界阻塞队列,同样按照先进先出FIFO原则存取元素,其吞吐量高于之前学习的ArrayBlockingQueue。

二、Demo和学习

   因为LinkedBlockingQueue的成员变量偏多,这边就不截图了,用代码的方式整理一下

 1     static class Node<E> {
 2         E item;
10         Node<E> next;
12         Node(E x) { item = x; }
13     }
16     private final int capacity;
19     private final AtomicInteger count = new AtomicInteger();
25     transient Node<E> head;
26 
31     private transient Node<E> last;
32 
33     /** Lock held by take, poll, etc */
34     private final ReentrantLock takeLock = new ReentrantLock();
35 
36     /** Wait queue for waiting takes */
37     private final Condition notEmpty = takeLock.newCondition();
38 
39     /** Lock held by put, offer, etc */
40     private final ReentrantLock putLock = new ReentrantLock();
41 
42     /** Wait queue for waiting puts */
43     private final Condition notFull = putLock.newCondition();

 从上面的代码可以看出LinkedBlockingQueue的实现挺复杂的。我们一一来看:

    Node:内部类也是LinkedBlockingQueue的底层原理一个个节点连接而成的链表,从其实现来看是一个单链表而E就是存放在这个链表中的元素类型

    capatical:是链表的容量

    count:是存放在链表中的当前元素的个数

    capacity:队列的容量

    putLock:入队锁

    takeLock:出队锁

内部实现了两个锁,一个put锁一个take锁JDK中本身就标明了put锁用于锁put,offer操作而take锁用于锁take,poll操作。为什么使用双锁操作,看了源码过后就能了解一二了,和ArrayBlockingQueue一样我们只看有阻塞操作的方法:

技术分享图片

LinkedBlockingQueue内部结构

put(E e):往阻塞队列中添加元素

 1   public void put(E e) throws InterruptedException {
 2         if (e == null) throw new NullPointerException();//不允许添加为null的元素
 5         int c = -1;
 6         Node<E> node = new Node<E>(e);//将添加元素组装成一个节点
 7         final ReentrantLock putLock = this.putLock;//入队锁
 8         final AtomicInteger count = this.count;//队列中的元素个数
 9         putLock.lockInterruptibly();//响应中断的锁
10         try {
19             while (count.get() == capacity) {//当队列满了
20                 notFull.await();//阻塞继续添加元素的线程
21             }
22             enqueue(node);//元素不满就入队
23             c = count.getAndIncrement();//队列元素个数增加
24             if (c + 1 < capacity)//如果队列中的元素增加后还是没到队列的容量
25                 notFull.signal();//唤醒后续添加元素的队列(唤醒一个就是进行增加一次正好到capacity容量值)
26         } finally {
27             putLock.unlock();//解锁
28         }
29         if (c == 0)//如果当前入队操作之前如果队列中存在的数量为0
30             signalNotEmpty();//唤醒获取元素的线程因为take阻塞只存在当c==0时,这样在put操作中放入一个元素就立马唤醒获取元素线程
31     }

 

signalNotEmpty():唤醒拿取元素的线程

1    private void signalNotEmpty() {
2         final ReentrantLock takeLock = this.takeLock;//获取出队锁
3         takeLock.lock();//上锁
4         try {
5             notEmpty.signal();//唤醒从队列中拿取元素的线程(每次只唤醒一个)
6         } finally {
7             takeLock.unlock();
8         }
9     }

put方法就是持续不断的往数据库进行增加元素,其逻辑不难:当队列满了以后添加元素的线程就会被阻塞,如果队列没满就添加元素,然后判断是否下一次添加就会超过这个队列的容量,如果没超过就唤醒后续添加元素的线程,当第一次添加元素的时候就开始唤醒获取元素的线程。双锁的操作就在这,put锁放入元素的时候不仅需要唤醒后续的添加元素的线程,而且当第一次添加元素以后还需要唤醒获取元素的线程。如果只是一把锁就很难区分开来无法知道何时唤醒什么锁导致获取和放入就会变得混乱。

 

offer(E e,long timeout,TimeUnit unit):往阻塞队列中添加元素

 1    public boolean offer(E e, long timeout, TimeUnit unit)
 2         throws InterruptedException {
 3         //不能添加null元素
 4         if (e == null) throw new NullPointerException();
 5         long nanos = unit.toNanos(timeout);//计算超时时间
 6         int c = -1;
 7         final ReentrantLock putLock = this.putLock;
 8         final AtomicInteger count = this.count;
 9         putLock.lockInterruptibly();//响应中断获取锁
10         try {
11             while (count.get() == capacity) {
12                 if (nanos <= 0)
13                     return false;
14                 nanos = notFull.awaitNanos(nanos);//超时阻塞
15             }
16             enqueue(new Node<E>(e));//入队一个新的节点
17             c = count.getAndIncrement();//获取自增
18             if (c + 1 < capacity)
19                 notFull.signal();
20         } finally {
21             putLock.unlock();
22         }
23         if (c == 0)
24             signalNotEmpty();
25         return true;
26     }

offer()方法大体的流程和put()方法的差距不大,只是在offer()方法内部定义了超时的时间,可以实现超时等待的功能。

 

上面都是往队列中添加元素的过程,下面就介绍一下从队列中获取元素的操作

take():从队列中获取元素

 1   public E take() throws InterruptedException {
 2         E x;
 3         int c = -1;
 4         final AtomicInteger count = this.count;//获取队列中的数量
 5         final ReentrantLock takeLock = this.takeLock;//出队锁
 6         takeLock.lockInterruptibly();//获取响应中断的锁
 7         try {
 8             while (count.get() == 0) {//队列中没元素了
 9                 notEmpty.await();//获取元素线程等待
10             }
11             x = dequeue();//获取出队的元素
12             c = count.getAndDecrement();//线程中的元素数量减一
13             if (c > 1)//如果剩余元素多余1个
14                 notEmpty.signal();//唤醒后继等待获取元素的线程
15         } finally {
16             takeLock.unlock();//解锁
17         }
18         if (c == capacity)//当队列中的获取元素操作之前的count值等于容量值
19             signalNotFull();//唤醒添加元素的线程,因为入队操作只会阻塞在队列满的时候
20         return x;
21     }

 

signalNotFull():唤醒添加元素的线程

1   private void signalNotFull() {
2         final ReentrantLock putLock = this.putLock;//获取入队锁
3         putLock.lock();//上锁
4         try {
5             notFull.signal();//唤醒存放元素的线程
6         } finally {
7             putLock.unlock();
8         }
9     }

take()方法是从队列中获取元素的操作,当队列中不存在元素时获取元素的线程都会被阻塞,当然当队列中的剩余个数还多余1个时,会唤醒等待中的获取线程继续获取元素,当队列内的现有元素个数等于队列容量的时候就唤醒入队操作线程。

 

dequeue():出队操作

 1   private E dequeue() {
 4         Node<E> h = head;//头节点
 5         Node<E> first = h.next;//头节点的下一个节点成为头节点
 6         h.next = h; // 将头节点的下一个引用值引用自己
 7         head = first;
 8         E x = first.item;
 9         first.item = null;//清除
10         return x;
11     }

 

poll(long timeout,Timeunit unit):超时获取队列中的元素

 1   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 2         E x = null;
 3         int c = -1;
 4         long nanos = unit.toNanos(timeout);
 5         final AtomicInteger count = this.count;
 6         final ReentrantLock takeLock = this.takeLock;出队锁
 7         takeLock.lockInterruptibly();//响应中断的锁
 8         try {
 9             while (count.get() == 0) {
10                 if (nanos <= 0)
11                     return null;
12                 nanos = notEmpty.awaitNanos(nanos);//超时等待
13             }
14             x = dequeue();//出队
15             c = count.getAndDecrement();//队列中的元素数量自减
16             if (c > 1)//代表还存在可以获取的元素
17                 notEmpty.signal();//唤醒等待中的获取元素线程
18         } finally {
19             takeLock.unlock();//解锁
20         }
21         if (c == capacity)
22             signalNotFull();
23         return x;
24     }

上面的是对于LinkedBlockingQueueQueue中的一些阻塞相关的方法的了解,双锁的机制决定了LinkedBlockingQueue在并发情况下也存在较好的效率。接下来看看使用LinkedBlockingQueue的demo:

 1 package cn.memedai;
 2 
 3 import java.util.concurrent.LinkedBlockingQueue;
 4 
 5 /**
 6  * LinkedBlockingQueueDemo
 7  */
 8 public class LinkedBlockingQueueDemo {
 9     class PutThread extends Thread {
10         private LinkedBlockingQueue<Integer> lbq;
11 
12         public PutThread(LinkedBlockingQueue<Integer> lbq) {
13             this.lbq = lbq;
14         }
15 
16         @Override
17         public void run() {
18             try {
19                 for (int i = 0; i < 10; i++) {
20                     System.out.println("put" + i);
21                     lbq.put(i);//往队列中放入元素
22                     Thread.sleep(100);
23                 }
24             } catch (InterruptedException e) {
25                 e.printStackTrace();
26             }
27         }
28     }
29 
30     class GetThread extends Thread {
31         private LinkedBlockingQueue<Integer> lbq;
32 
33         public GetThread(LinkedBlockingQueue<Integer> lbq) {
34             this.lbq = lbq;
35         }
36 
37         @Override
38         public void run() {
39             try {
40                 for (int i = 0; i < 10; i++) {
41                     System.out.println("take" + lbq.take());//从队列中拿元素
42                     Thread.sleep(100);
43                 }
44             } catch (InterruptedException e) {
45                 e.printStackTrace();
46             }
47         }
48     }
49 
50     public static void main(String[] args) {
51         LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();
52 
53         PutThread putThread = new LinkedBlockingQueueDemo().new PutThread(lbq);
54         GetThread getThread = new LinkedBlockingQueueDemo().new GetThread(lbq);
55 
56         putThread.start();
57         getThread.start();
58     }
59 }

下面看看Demo执行的结果:

 1 put0
 2 take0
 3 put1
 4 take1
 5 put2
 6 take2
 7 put3
 8 take3
 9 put4
10 take4
11 put5
12 take5
13 put6
14 take6
15 put7
16 take7
17 put8
18 take8
19 put9
20 take9

三、总结

   LinkedBlockingQueue作为线程池中常用的实现,一般来说我们使用LinkedBlockingQueue时都需要指定它的默认值,因为不指定的时候其默认值为Integer.MAX_VALUE这个值非常的大,在并发量比较大的情况下可能会撑爆内存影响服务器的运行。这里我们可以拿ArrayBlockingQueue与LinkedBlockingQueue进行一个比较:

   第一:队列的大小有所不同,LinkedBlockingQueue有其默认值可以不指定,但是ArrayBlockingQueue必须在使用的时候指定其初始化的大小。

   第二:数据底层存储的原理不同,ArrayBlockingQueue的底层存储是通过数据实现的,LinkedBlockingQueue底层通过单向队列实现的。

   第三:锁操作不同,在ArrayBlockingQueue中只需要一把内置所就能实现阻塞入队和出队,并且每次添加都会唤醒一次等待中的获取线程而每次获取都会唤醒等待中的添加线程。而LinkedBlockingQueue底层通过双锁的机制实现,入队锁只专注于入队的操作而出队锁只专注于出队操作,两个锁之间不互斥不影响大大提高了并发的效率。

 

 

 

 

================================================================================== 

不管岁月里经历多少辛酸和艰难,告诉自己风雨本身就是一种内涵,努力的面对,不过就是一场命运的漂流,既然在路上,那么目的地必然也就是前方。


==================================================================================




以上是关于阻塞队列之二:LinkedBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

Linux C编程之二十二 Linux线程池实现

RabbitMQ指南之二:工作队列(Work Queues)

RabbitMQ指南之二:工作队列(Work Queues)

Java数据结构学习笔记之二Java数据结构与算法之队列(Queue)实现

看图说话之二叉堆(优先队列)——原理解析

Python之路第一课Day10--随堂笔记(异步IO数据库队列缓存之二)