JDK1.8 LinkedBlockingQueue类说明
Posted zhujm320
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK1.8 LinkedBlockingQueue类说明相关的知识,希望对你有一定的参考价值。
介绍
LinkedBlockingQueue是一种基于链表实现的阻塞队列,它实现了BlockingQueue的接口,线程安全。LinkedBlockingQueue初始化时,可以不指定大小,不指定大小默认大小为Integer.MAX_VALUE,属于无界队列。使用时建议初始化一个大小,避免LinkedBlockingQueue元素过大,将系统内存耗光。
关于队列的实现原理请参考 队列实现原理和JDK1.8 BlockingQueue接口说明,链表原理参考
设计思想
在队头出队,在队尾入队,出队和入队分别采用了各自的锁,这样入队和出队可以同时操作,提供队列的吞吐量,提高了效率。实现了生产者与消费者模型。
基本功能解读
节点类(内部类)
static class Node<E>
E item;
Node<E> next;
Node(E x) item = x;
队列元素节点,该元素有一个指针指向后继节点,单向链表节点。
成员属性
队列容量,如果不设置,默认大小为Integer.MAX_VALUE
private final int capacity;
队列实际大小
private final AtomicInteger count = new AtomicInteger();
采用原子操作,由于入队和出队可以同时操作,队列实际大小可能同时会变化,则需要一个原子操作来保障数据的同步。
队列头
transient Node<E> head;
队列尾
private transient Node<E> last;
出队锁
private final ReentrantLock takeLock = new ReentrantLock();
队列非空条件
private final Condition notEmpty = takeLock.newCondition();
入队锁
private final ReentrantLock putLock = new ReentrantLock();
队列非满条件
private final Condition notFull = putLock.newCondition();
构造函数
默认构造函数
public LinkedBlockingQueue()
this(Integer.MAX_VALUE);
默认队列大小为Integer.MAX_VALUE
需要设置队列大小的构造函数
public LinkedBlockingQueue(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
集合初始化队列构造函数,队列大小为Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c)
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try
int n = 0;
for (E e : c)
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
count.set(n);
finally
putLock.unlock();
基本功能介绍
通知队列非空函数
private void signalNotEmpty()
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try
notEmpty.signal();
finally
takeLock.unlock();
如果有线程进行出队操作,由于队列没有元素,该线程被阻塞了,那么下次有元素入队时,则通过该函数进行唤醒
通知队列非满函数
private void signalNotFull()
final ReentrantLock putLock = this.putLock;
putLock.lock();
try
notFull.signal();
finally
putLock.unlock();
如果有线程进行入队操作,由于队列满了,则该线程阻塞,那么下次有元素出队时,这时队列不是满的,则通过该函数进行唤醒
入队操作
阻塞式入队
public void put(E e) throws InterruptedException
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity)
notFull.await(); //队列满, 线程阻塞
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
finally
putLock.unlock();
if (c == 0)
signalNotEmpty();
如果队列满,则线程阻塞(notFull.await()调用, 线程阻塞),反之如果队列不是满的,则通过enqueue进行入队操作
private void enqueue(Node<E> node)
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
尾部入队操作
非阻塞入队
public boolean offer(E e)
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try
if (count.get() < capacity)
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
finally
putLock.unlock();
if (c == 0)
signalNotEmpty();
return c >= 0;
非阻塞入队,如果队列满,直接返回false
出队操作
阻塞式出队
public E take() throws InterruptedException
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try
while (count.get() == 0)
notEmpty.await(); //队列为空, 则线程阻塞
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
finally
takeLock.unlock();
if (c == capacity)
signalNotFull();
return x;
空队列时,调用该函数进行出队操作,则线程直接阻塞
非阻塞出队操作
public E poll()
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try
if (count.get() > 0)
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
finally
takeLock.unlock();
if (c == capacity)
signalNotFull();
return x;
非阻塞出队,队列为空时,直接返回null
查看对头元素
public E peek()
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
finally
takeLock.unlock();
查看对头元素,如果队列是空的,直接返回null
类图
以上是关于JDK1.8 LinkedBlockingQueue类说明的主要内容,如果未能解决你的问题,请参考以下文章