java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比相关的知识,希望对你有一定的参考价值。
上一篇我们说到ArrayBlockingQueue,底层是数组加锁机制实现同步阻塞队列,这里我们说下另外一个同步阻塞队列LinkedBlockingQueue
.
从名字上就可以看出LinkedBlockingQueue底层数据结构是基于链表结构的。我们看下其几个关键属性:
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
与ArrayBlockingQueue不同,LinkedBlockingQueue维护了两把锁,一个takeLock 和一个putLock 分别控制并发读和并发写。而对于读取和写入来说,操作的分别是链表的头部和尾部,不存在竞争关系,理论上
public LinkedBlockingQueue()
this(Integer.MAX_VALUE);
public LinkedBlockingQueue(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
可以看到,默认构造条件下,容量大小是Integer.MAX_VALUE
,我们接下来看看其take
和put
操作:
public void put(E e) throws InterruptedException
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try
//队列满的时候,阻塞,等待唤醒
while (count.get() == capacity)
notFull.await();
// 被其他线程唤醒,这时候获取到putLock,且队列没有满,插入元素
enqueue(node);
// 需要注意的是,c 为实际大小 -1
c = count.getAndIncrement();
// 如果队列未满,唤醒其他生产线程
if (c + 1 < capacity)
notFull.signal();
finally
putLock.unlock();
// c == 0 的时候,表名链表中元素为c+1个,这时候尝试唤醒消费者
// 这里的逻辑可能会有点绕,并不是判断只要 c > 0 就去唤醒,
// 因为消费线程阻塞的条件是链表元素个数为0,当count==0的时候,即链表元素个数为0的时候链表空了,这时候消费者会被阻塞
// 而这里生产者通过锁机制,只能一次插入一个,这时候链表是空的,插入后,链表容量为1,这时候就可以唤醒消费者了
if (c == 0)
signalNotEmpty();
public E take() throws InterruptedException
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try
while (count.get() == 0)
notEmpty.await();
x = dequeue();
c = count.getAndDecrement();
// 如果队列不为空,唤醒其他消费等待线程
if (c > 1)
notEmpty.signal();
finally
takeLock.unlock();
if (c == capacity)
signalNotFull();
return x;
我们看下,LinkedBlockingQueue中的入队和出队是如何操作的:
private void enqueue(Node<E> node)
last = last.next = node;
private E dequeue()
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
LinkedBlockingQueue在构造初始化的时候,就会初始化一个空的节点:
last = head = new Node<E>(null);
然后在每次出队的时候,他的处理很微妙,并不是我们理解的将队列头元素弹出,LinkedBlockingQueue的队列的头结点永远是个空节点,如果需要出队(由于有计数器的控制),队列肯定不为空,表名head头结点肯定有下个节点,这时候并不是将头结点出队返回,而是将头结点的下一个节点设置为头结点,并且将这个节点的值取出,然后设置该节点的值为空,这样就有了一个新的空节点,出队操作的是head节点,入队操作则是last节点,这样保证了并发入队和出队的时候,二者操作的不会是同一个节点。
迭代
LinkedBlockingQueue是支持通过迭代器进行迭代处理,在迭代的时候,会同时对takeLock和putLock上锁,这时候既不能入队也不能出队。
public Iterator<E> iterator()
return new Itr();
Itr()
fullyLock();
try
current = head.next;
if (current != null)
currentElement = current.item;
finally
fullyUnlock();
public E next()
fullyLock();
try
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
finally
fullyUnlock();
void fullyLock()
putLock.lock();
takeLock.lock();
void fullyUnlock()
takeLock.unlock();
putLock.unlock();
可以看到,在使用迭代器进行迭代的时候,使用fullyLock
进行putLock
和takeLock
的上锁。
对比ArrayBlockingQueue
和ArrayBlockingQueue对比,二者提供的功能基本一致,只不过二者在底层数据结构上不一样
- ArrayBlockingQueue底层采用的是有界数组,数组大小一旦确定后后期不能在调整,且在初始化之后就分配了固定大小的数组容量空间
- LinkedBlockingQueue底层基于链表,其大小则是通过AtomicInteger来判定,但是每次入队都需要新建一个Node对象。
以上是关于java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比的主要内容,如果未能解决你的问题,请参考以下文章
java同步阻塞队列之ArrayBlockingQueue实现原理
java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比
java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理
Java同步数据结构之ConcurrentLinkedQueue