多线程并发阻塞队列BlockingQueueConcurrentLinkedQueue场景分析
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程并发阻塞队列BlockingQueueConcurrentLinkedQueue场景分析相关的知识,希望对你有一定的参考价值。
前言
对于多线程阻塞队列,在juc包下面提供了各种实现得队列,包括 ArrayBlockingQueue 、LinkedBlockingQueue,保证数据线程安全的情况,并使用lock锁中的single 和awaite方法来阻塞起来;以及ConcurrentLinkedQueue 队列适用于并发场景下
BlockingQueue
是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
对于阻塞队列一定有上面方法的实现
常见的BlockingQueue实现
ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
源码主要实现
源代码注释
有界的队列数组。该队列对元素进行FIFO排序(先进先出)。这个队列的头是已在队列上的元素排队时间最长队列的尾部是已在队列上停留最短时间的元素。新元素插入到队列尾部,队列检索操作获取队列头部的元素。
属性分析
/** 队列 数组 */
final Object[] items;
/** 轮询、查看或删除的项目索引*/
int takeIndex;
/** 报价或添加的项目索引 */
int putIndex;
/** 队列大小 */
int count;
/** 主要锁 */
final ReentrantLock lock;
/** condition 取出等待*/
private final Condition notEmpty;
/** condition 添加等待 */
private final Condition notFull;
/*
*/
transient Itrs itrs = null;
主要属性中能分析出,队列添加有两个索引来记录删除和添加的指针。以及使用了单锁的conditon来保证阻塞等待。
这里为什么使用单锁而不使用双锁这个
- 从LinkedBlockingQueue使用了takeLock和putLock两把锁,分别用于阻塞队列的读写线程,也就是说,读线程和写线程可以同时运行,在多线程高并发场景,应该可以有更高的吞吐量,性能比单锁更高。也有可能是LinkedBlockingQueue是由链表组成操作的分别是头尾节点,相互竞争的关系较小。
- 而ArrayBlockingQueue是数组,添加和删除都是在同一个数组上,虽然也可以用两个锁但是实现上需要更多的控制
put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //获取到锁
lock.lockInterruptibly();//加锁,除非线程interuptibly
try {
while (count == items.length)
notFull.await(); //队列满了进行等待
enqueue(e); //释放 notEmpty
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
基于链表的阻塞队列,其内部维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据。
源码主要实现
属性分析
主要的node节点
/**
*链表节点类
*/
static class Node<E> {
E item;
/**
* One of:
* - 后继节点
* - 此节点,表示后继节点为
* -null,表示没有后续节点(这是最后一个节点)
*/
Node<E> next;
Node(E x) { item = x; }
}
基本锁 及容量大小等属性
/** 容量界限,如果没有,则为Integer.MAX_值 */
private final int capacity;
/** 当前元素数 */
private final AtomicInteger count = new AtomicInteger();
/**
*链表的头。
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* 链表尾
* Invariant: last.next == null
*/
private transient Node<E> last;
/** 被接受、投票等持有的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 等待队列等待 */
private final Condition notEmpty = takeLock.newCondition();
/** 要约等持有的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 等待放置的等待队列 */
private final Condition notFull = putLock.newCondition();
put方法
在此队列尾部插入指定的元素,如果空间变得可用的必要条件。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 注:所有put/take/etc中的惯例是预设本地var
// 除非设置,否则保持计数为负数表示失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
*注意,count在wait-guard中使用,即使它是
*没有锁的。这是有效的,因为计数可以
*仅在此点减少(所有其他PUT均关闭
*我们(或其他等待着的人)是
*如果容量发生变化,则发出信号。同样地
*用于其他等待保护中计数的所有其他用途。
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
这其中有各 signalNotEmpty();这里就是释放take锁的
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
其他的队列
DelayQueue和PriorityBlockingQueue和SynchronousQueue,在开发中用到很少,
例如SynchronousQueue 我们用的很少但在线程池中会使用到,用于创建一个 一种无缓冲的等待队列,直接执行任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
PriorityBlockingQueue 优先阻塞队列 会有Comparable 去比较 元素大小,自动排序。
// 可以设置比对方式
PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(5,
new Comparator<String>() {
@Override //
public int compare(String o1, String o2) {
int num1 = new Integer(o1);
int num2 = new Integer(o2);
if (num1 > num2)
return -1;
else if (num1 == num2)
return 0;
else
return 1;
}
});
queue.put("48");
queue.put("01");
queue.put("12");
queue.put("27");
queue.put("31");
for (;queue.size()>0;){
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
并发队列ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,在jdk1.5版本开发出来,采用先进先出的规则对节点进行排序,采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。 也就是cas操作
相对于LinkedBlockingQueue在多线程下表现,更加高效,性能更好
ConcurrentLinkedQueue<String> conQueue = new ConcurrentLinkedQueue<>();
conQueue.offer("");
conQueue.poll();
源代码分析
接口实现
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable
源码注意点
/*
*这是对Michael&Scott算法的修改,适用于垃圾收集环境,支持内部节点删除(支持删除(对象))。对于解释一下,读读报纸。
*
*请注意,与此包中的大多数非阻塞算法一样,此实现依赖于以下事实:在垃圾中收集到的系统中,不存在ABA问题的可能性要回收节点,则无需使用“计数”“指针”或在中使用的版本中看到的相关技术非GC设置。
*
*基本不变量是:
*-只有一个(最后一个)节点的下一个引用为空,排队时会出现这种情况。最后一个节点可以是在O(1)时间内从尾部到达,但尾部只是一个优化-从头也一样。
*-队列中包含的元素是队列中的非空项可从头部到达的节点。包装该项目将节点引用为null会自动将其从排队。必须保持所有元素从head的可达性即使在导致前进。退出队列的节点可能仍在使用中
*由于创建迭代器或简单的已丢失其时间片的poll()。
*
*以上可能暗示所有节点都是GC可访问的
*来自前一个已退出队列的节点。这将导致两个问题:
-允许恶意迭代器导致无限内存保留
*-在以下情况下导致旧节点跨代链接到新节点:
*一个节点在活动时被保留,这一代GCs具有处理困难,导致重复重大收集。
*但是,只有未删除的节点才需要可以从中访问退出队列的节点,不一定要具有可达性属于总承包商理解的类型。我们使用将刚退出队列的节点链接到自身。这样的
*“自我链接”隐含着向头部前进的意思。
*
*头部和尾部都允许滞后。事实上每次更新都是一个重要的优化(病例较少)。与LinkedTransferQueue一样(请参阅内部该类的文档),我们使用两个松弛阈值;也就是说,当当前指针出现时,我们更新head/tail
*距离第一个/最后一个节点两步或两步以上。
*
*由于head和tail是同时独立更新的,
*尾巴有可能落后于头部(为什么不)?
*
*将节点的项引用以原子方式封装为null将删除元素从队列中删除。迭代器跳过带有null的节点项目。该类以前的实现在poll()和remove(Object)中会出现相同元素的位置
*要通过两个并发操作成功删除。这个方法remove(Object)也会延迟取消已删除节点的链接,但这仅仅是一种优化。
*
*在构造节点时(排队前),我们避免支付费用对于易失性写入项,改为使用Unsafe.putObject一种正常的书写方式。这使得排队的成本得以降低“一个半”案例。
*
*头部和尾部都可能指向或不指向具有非空项。如果队列为空,则所有项目当然必须为空不能为空。创建时,头部和尾部都指虚拟对象具有空项的节点。头部和尾部仅使用更新
*CAS,所以它们从不倒退,尽管这只是一个简单的过程优化。
*/
属性
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
这里jdk通过unsafe.getunsafe去获取cas操作是可以的,而我们通过他的方法去拿,则会报一个安全机制的错误。只有通过反射去拿去
offer方法
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p 是最后节点
if (p.casNext(null, newNode)) {
//成功的CAS是线性化点
//要使e成为此队列的一个元素,
//让newNode成为“活的”。
if (p != t) // 一次跳过两个节点
casTail(t, newNode); // 失败是可以的。
return true;
}
// 丢失CAS比赛到另一个线程;下次重读
}
else if (p == q)
//我们从名单上掉了下来。如果尾不变,它将
//也将被排除在名单之外,在这种情况下,我们需要
//跳转到头部,所有活动节点始终从该头部跳转
//可达的。否则,新尾巴是更好的选择。
p = (t != (t = tail)) ? t : head;
else
// 两次跳跃后检查尾部更新。
p = (p != t && t != (t = tail)) ? t : q;
}
}
这里全是使用的cas操作数据,因此这才能达到,高性能,但是cpu的消耗冲突是非常大的
以上是关于多线程并发阻塞队列BlockingQueueConcurrentLinkedQueue场景分析的主要内容,如果未能解决你的问题,请参考以下文章
Java并发多线程编程——阻塞队列(BlockingQueue)
多线程并发阻塞队列BlockingQueueConcurrentLinkedQueue场景分析