java Blocking Queue
Posted carl_ysz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java Blocking Queue相关的知识,希望对你有一定的参考价值。
一、Java中的阻塞队列
在多线程之间通信中,多个线程共享一个进程所分配的资源,共享内存是一种常见的通信方式,而阻塞队列则是其实现方式的一种,例如经典的生产者-消费者模式。
A Queue that addtionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become avialable in the queue when storing an element.
阻塞队列中提供了2个操作:
队列为空时,获取元素的线程会阻塞队列一直至队列非空。
队列为满时,存储元素的线程会阻塞队列非满。
Java中的阻塞队列有7种:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
下面进行说明
二、ArrayBlockingQueue
使用数组实现队列
2.1 构造器
public ArrayBlockingQueue(int capacity, boolean fair, @NotNull Collection<? extends E> c) //Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection\'s iterator.
参数说明:
capacity: 队列的容量
fair: 默认是false,如果是true的话则移除元素的顺序符合FIFO顺序,false的话没有顺序
c: 预填充元素
实现主要如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
2.2 put方法实现
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
由于数组这种数据结构的特殊性,若想要线程安全的添加或者删除,都必须将整个数组锁住,因此这里实现使用一把锁, 以及2个条件队列。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
这段代码非常容易理解,注意点:
(1) 可重入锁
(2) 使用2个条件队列:Java concurrent包对多条件队列的支持,古老的wait和notify方法也可以实现,只是由于条件队列中的条件不同,必须使用notifyall(),损失了性能。
(3) 可中断锁
2.3 take() 方法说明
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
总结: ArrayBlockingQueue应该是最简单的阻塞队列实现了,由于数组结构的特殊性,使用了一把锁和2个条件队列,锁的方式是可中断锁。
三、LinkedBlockingQueue
使用链表实现队列,构造器使用方式和ArrayBlockingQueue一样
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
注意到初始化的时候创建了一个空节点...
3.1 使用2把锁实现
这里使用了2把锁,2个条件队列实现,而且在属性定义中就直接初始化了。一个存锁,一个取锁,一个非空条件队列,一个非满条件队列
private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
3.2 put的实现
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
说明:
- 除了锁之外,使用了原子变量来记录链表大小,因为使用了2把锁来锁节点,全局链表的大小使用线程安全的原子变量确实比较合适
- 此时锁的是putLock,存锁
下面看singalNotEmpty()方法:
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
发现唤醒时使用了取锁,使用了连续的锁。我们大致梳理一下顺序,在不考虑异常的情况下:
(1) 当前线程竞争存锁putLock并将其锁住
(2) 条件队列notFull(非满条件)等待
(3) 链表中插入元素
(4) 原子变量自增
(5) if(c+1<capacity),则条件队列notFull唤醒操作,通知其他的put线程,链表未达上限,依旧可以插入元素
(6) 释放putLock锁
(7) 如果c==0,原子变量修改-1->0,表明插入成功,通知其他take线程可以去取出元素
i. 当前线程竞争takeLock,并且锁住
ii. notEmpty.signal,通知其他在wait take lock的线程可以取出元素
iii. 释放takeLock
3.3 take的实现
类似,略
3.4 总结
(1) 使用了2把不同的锁
(2) 使用原子变量控制容量
(3) 使用链表数据结构
为什么可以用2把锁提升性能,减少锁竞争?
注意下面2个方法:入队方法操作的是last,出对方法操作的first,正是由于链表结构的特殊性,可以使用2把锁来提高锁粒度,从而减少锁竞争。
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
四、Synchronousqueue
个人感觉http://ifeve.com/java-synchronousqueue/讲的比较详细.这里引用一段话描述:
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
package concurrent.demo.synchronous_queue; import concurrent.demo.Utils; import java.util.concurrent.CountDownLatch; /** * <B>系统名称:</B><BR> * <B>模块名称:</B><BR> * <B>中文类名:</B><BR> * <B>概要说明:</B><BR> * * @author carl.yu * @since 2016/6/14 */ public class NativeSynchronousQueue<E> { E item = null; boolean putting = false; //保证放进去之后,其他的取走了,其他的put线程才允许取 public synchronized void put(E e) throws InterruptedException { //1. 只有一个item为null的时候才允许放 while (putting) { wait(); } putting = true; item = e; //2. 通知其他的线程来取,可能会通知错了人,造成假唤醒,所以需要用条件队列putting为true来判定,继续睡吧 notifyAll(); /*必须使用notify不能使用notifyAll*/ /*这里的notifyAll是为了让其他的take线程醒来,而不是put线程醒来哦*/ //3. 只有取完的线程才可以来拿 while (item != null) { wait(); } putting = false; notifyAll(); /*这里才是为了put线程醒来*/ } /*take比较简单,就是拿*/ public synchronized E take() throws InterruptedException { E res = null; while (item == null) { wait(); } res = item; item = null; notifyAll(); return res; } //测试 public static void main(String[] args) throws Exception { final NativeSynchronousQueue queue = new NativeSynchronousQueue(); final CountDownLatch latch = new CountDownLatch(3); Thread put01 = new Thread() { @Override public void run() { try { queue.put("1"); System.out.println("put01成功"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread put02 = new Thread() { @Override public void run() { try { queue.put("2"); System.out.println("put02成功"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread take01 = new Thread() { @Override public void run() { try { System.out.println("take成功:" + queue.take()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread take02 = new Thread() { @Override public void run() { try { System.out.println("take成功:" + queue.take()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; put01.start(); Utils.sleep(10); put02.start(); Utils.sleep(10); //当没有线程take的时候,put永远不会成功 take01.start(); Utils.sleep(10); take02.start(); latch.await(); System.out.println("测试结束"); }
五、条件队列和线程池的使用
Java中,线程池的使用非常常见,jdk1.8中还新增了许多连接池,例如newWorkStealingPool用作并行计算...
这里主要强调条件队列的用法
5.1 使用有界队列实现
直接演示
定义任务类:
public class MyTask implements Runnable { private int taskId; private String taskName; public MyTask(int taskId, String taskName){ this.taskId = taskId; this.taskName = taskName; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public void run() { try { System.out.println("run taskId =" + this.taskId); Thread.sleep(5*1000); //System.out.println("end taskId =" + this.taskId); } catch (InterruptedException e) { e.printStackTrace(); } } public String toString(){ return Integer.toString(this.taskId); } }
自定义拒绝策略:
package com.bjsxt.height.concurrent018; import java.net.HttpURLConnection; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejected implements RejectedExecutionHandler{ public MyRejected(){ } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义处理.."); System.out.println("当前被拒绝任务为:" + r.toString()); } }
public static void main(String[] args) { /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //MaxSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() , new MyRejected() //, new DiscardOldestPolicy() ); MyTask mt1 = new MyTask(1, "任务1"); MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6"); pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6); pool.shutdown(); }
逐渐增加任务数,由1增加到6发现效果如下:
(1) 当任务数<1时,加入一个任务直接创建线程去处理
(2) 继续加入任务>corePoolSize=1时,会加入队列ArrayBlockingQueue
(3) 一直加入到任务数为4,队列中3个元素,队列满了
(4) 任务数继续增加,到5,会继续创建线程一直到maxPoolSize(2),因此此时执行的任务都1和5
(5) 任务数增加为6,执行拒绝策略
5.2 无界队列实现
将上述代码修改为用LinkedBlockingQueue()无界队列实现,发现此时maxSize参数没有作用,拒绝策略也没有作用,只有coreSize才有作用
(1) 当任务数<coreSize,会创建线程
(2) 当任务数>coreSize,会将任务放置到无界队列中,直到系统崩溃
(3) maxSize没有任何作用
(4) 拒绝策略没有任何作用
因此,JDK在实现FixedThreadPool时,maxSize和coreSize相等
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
5.3 Synchronousqueue实现
JDK中CachedThreadPool用这种队列实现,性能非常好
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
以上是关于java Blocking Queue的主要内容,如果未能解决你的问题,请参考以下文章
caffe代码阅读3:data_readerinternalthread以及blocking_queue的实现细节-2016.3.15
LeetCode 1188. Design Bounded Blocking Queue
caffe里的blocking_queue.hpp与.cpp干了点什么呢???
案例复现,带你分析Priority Blocking Queue比较器异常导致的NPE问题
案例复现,带你分析Priority Blocking Queue比较器异常导致的NPE问题
pandle报SystemError: (Fatal) Blocking queue is killed because the data reader raises an exception.