java阻塞队列小结
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java阻塞队列小结相关的知识,希望对你有一定的参考价值。
【README】
1,本文介绍了java的7个阻塞队列;
2,阻塞队列的作用
- 做缓冲作用,如缓冲kafka消息,而不是直接发送给kafka,减少kafka集群的压力;
【1】阻塞队列 BlockingQueue 概述
1,队列是一种数据结构,先进先出;
2,阻塞队列的意思是:
- 当阻塞队列为空时,线程1从队列取出元素会阻塞;直到线程2向队列添加了新值;
- 但阻塞队列满时,线程1向队列添加元素会阻塞;直到线程2从队列取走了值(头部元素);
3,阻塞队列封装了 线程阻塞,线程唤醒的底层方法,不需要程序员编写这些代码,减少了开发量和代码简洁;
【2】阻塞队列介绍
【2.1】ArrayBlockingQueue 数组阻塞队列
0)类描述:
底层使用可重入锁 ReentrantLock 实现,每个操作前都会加锁,操作完成后解锁;所以仅允许一个线程串行操作阻塞队列中的任意一个方法;(线程1操作a方法,线程1退出前,线程2不能操作b方法,注意是b方法)
基于数组的有界阻塞队列;
在创建该队列时,给定队列容量 capacity,一旦创建,capacity无法修改;
此类支持 排序等待生产者和消费者线程的可选公平策略。默认情况下,不保证此顺序。但是,在公平性设置为 true 的情况下构造的队列以 FIFO 顺序授予线程访问权限。公平通常会降低吞吐量,但会减少可变性并避免饥饿。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
1)类定义
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
2)构造器
public ArrayBlockingQueue(int capacity)
this(capacity, false);
// 带有公平性
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
// 基于数组实现的阻塞队列
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
// 给定初始队列元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try
int i = 0;
try
for (E e : c)
checkNotNull(e);
items[i++] = e;
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
count = i;
putIndex = (i == capacity) ? 0 : i;
finally
lock.unlock();
3)元素操作方法
- boolean add(E e):(底层调用offer(E e))插入元素e到队列;只要没有超过capacity,立即插入成功并返回true;若队列满,则抛出异常IllegalStateException ; 若元素e为空,报空指针;【不阻塞】
- boolean offer(E e):与add方法不同的是,若队列满,返回false,而不是抛出异常(add方法会抛出异常);【不阻塞】
- void put(E e) throws InterruptedException:插入元素e到队列;若队列满,则线程等待直到有可用空间;【阻塞】;线程等待可能抛中断异常;插入可能抛空指针异常,若e为null;
- boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException:若队列满,则线程等待给定时间直到有可用空间;【限时阻塞】
- E poll():从队列取出元素;若队列空,返回null;【不阻塞】
- E take() throws InterruptedException:从队列取出元素;若队列空,则线程阻塞;阻塞可能抛出中断异常;【阻塞】
- E poll(long timeout, TimeUnit unit) throws InterruptedException:从队列获取元素,若队列为空,则【限时阻塞】;
- E peek():从队列获取元素,但不出队,不阻塞,队列空获取null;
4)其他方法
- int size(): 获取队列元素个数;
- int remainingCapacity():还可以存储的元素个数;但它大于0,并不表示插入一定成功,因为有并发问题;
【2.2】LinkedBlockingQueue 链表阻塞队列
0)类描述
基于链表的可选有界队列, 可选的意思是,给定容量则有界,否则无界;
基于链接节点的可选有界阻塞队列(基于链表的可选有界队列)。 此队列对元素 FIFO(先进先出)进行排序。 队列的头部是在队列中停留时间最长的那个元素。 队列的尾部是在队列中停留时间最短的那个元素。 新元素插入队列尾部,队列检索操作获取队列头部元素。
链接队列通常比基于数组的队列具有更高的吞吐量,但在大多数并发应用程序中的可预测性较差。
可选的容量绑定构造函数参数是一种 防止队列过度扩展的方法。 如果未指定,容量等于 Integer.MAX_VALUE。 链接节点在每次插入时动态创建,除非这会使队列超过容量。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
此类是 Java 集合框架的成员。
1)类定义
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
2)构造器
2.1)链表节点-静态内部类
static class Node<E>
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) item = x;
// 两把锁
/** 元素获取锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 元素非空条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 元素插入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 元素非满条件 */
private final Condition notFull = putLock.newCondition();
2.2)构造器
// 构造器
public LinkedBlockingQueue()
this(Integer.MAX_VALUE);
// 给定队列大小
public LinkedBlockingQueue(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null); // 头节点
// 创建大小为 Integer.Max_value 的,用给定容器初始化元素的 队列
public LinkedBlockingQueue(Collection<? extends E> c)
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try
int n = 0;
for (E e : c)
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
count.set(n);
finally
putLock.unlock();
3)元素操作方法
同 ArrayBlockingQueue的元素操作方法,包括
- void put(E e) throws InterruptedException;
- boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
- boolean offer(E e)
- E take() throws InterruptedException
- E poll(long timeout, TimeUnit unit) throws InterruptedException
- E poll()
- E peek()
【2.3】PriorityBlockingQueue 优先级阻塞队列 (底层使用数组)
0)类描述
它是一个基于数组的无界阻塞队列,自动扩容保证无界;插入元素用不阻塞,获取元素阻塞(干货);
一个无界阻塞队列,它使用与 PriorityQueue 类相同的排序规则并提供阻塞检索操作。
虽然这个队列在逻辑上是无界的,但由于资源耗尽(导致 OutOfMemoryError),尝试添加可能会失败。此类不允许空元素。依赖于自然排序的优先级队列也不允许插入不可比较的对象(这样做会导致 ClassCastException)。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。方法 iterator() 中提供的 Iterator 不能保证以任何特定顺序遍历 PriorityBlockingQueue 的元素。如果您需要有序遍历,请考虑使用 Arrays.sort(pq.toArray())。此外,drainTo 方法可用于按优先级顺序删除部分或全部元素,并将它们放置在另一个集合中。
对此类的操作不保证具有相同优先级的元素的顺序。如果您需要强制排序,您可以定义自定义类或比较器,它们使用辅助键来打破主要优先级值之间的联系。例如,这是一个将先进先出打破平局应用于可比元素的类。要使用它,您需要插入一个新的 FIFOEntry(anEntry) 而不是普通的条目对象。
1)类定义
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
2)构造器
// 创建优先级阻塞度列,默认大小 Integer.max_value, 比较器为null,默认为字典序
public PriorityBlockingQueue()
this(DEFAULT_INITIAL_CAPACITY, null);
// private static final int DEFAULT_INITIAL_CAPACITY = 11; 默认大小11
// 创建优先级阻塞度列,给定大小 , 比较器为null,默认为字典序
public PriorityBlockingQueue(int initialCapacity)
this(initialCapacity, null);
// 创建优先级阻塞度列,给定大小 ,和比较器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
// 创建优先级阻塞队列,包含给定的集合c;若集合是 SortedSet 或 PriorityQueue,则创建后的队列的元素顺序不变,否则采用字典序
public PriorityBlockingQueue(Collection<? extends E> c)
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>)
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
else if (c instanceof PriorityBlockingQueue<?>)
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null))
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
this.queue = a;
this.size = n;
if (heapify)
heapify();
3)元素操作方法
- boolean add(E e) return offer(e);:插入元素,调用 offer(e) 方法; 同 offer(e)
- boolean offer(E e) : 插入元素到优先级队列。因为队列是无界的,所以只会返回true,不会返回false;且插入元素到队列永远不会阻塞(是插入不会阻塞,而不是获取元素);
- 若给定元素不能与优先级队列中的元素进行比较,抛出ClassCastException 异常;
- 给定元素为null,报空指针异常;
- void put(E e) offer(e); : 同offer(e) 方法;
- E take() throws InterruptedException:获取元素;若队列为空,阻塞等待;等待中可能抛出中断异常;
- E poll(long timeout, TimeUnit unit) throws InterruptedException:获取元素;若队列为空,阻塞等待;最多等待timeout时间,等待中可能抛出中断异常;
- E peek():仅获取队列首部元素,但首部元素不出队;
【2.4】DelayQueue - 延迟队列
0)类描述
Delayed元素的无界阻塞队列,每个元素只能在其延迟到期时被取用;
其底层使用了优先级队列来实现;
Delayed 元素的无界阻塞队列,其中每个元素只能在其延迟到期时被取用。
队列的头部是过去延迟过期最远的那个 Delayed 元素。 如果没有延迟到期,则没有 头部且 poll() 方法返回 null。
当元素的 getDelay(TimeUnit.NANOOSECONDS) 方法返回值小于或等于零的值时,就会发生过期。
尽管无法使用 take 或 poll()方法 取走(删除)未过期的元素,但它们仍被视为普通元素。 例如,size 方法返回过期和未过期元素的总个数。 此队列不允许空元素。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
方法 iterator() 中提供的 Iterator 不保证以任何特定顺序遍历 DelayQueue 的元素。
1)类定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
// 延迟队列底层使用 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
.......
- 1.1)Delayed 接口定义
一个混合风格的接口,用于标记在给定延迟后应采取行动的对象。
此接口的实现必须定义一个 compareTo 方法,以提供与其 getDelay 方法一致的排序。
// 延迟接口
public interface Delayed extends Comparable<Delayed>
long getDelay(TimeUnit unit);
// Comparable 接口
public interface Comparable<T>
public int compareTo(T o); // compareTo方法
- getDelay() 方法描述:
返回关联对象剩余的延迟时间;单位为unit;
2)构造器
// 创建一个空的延迟队列
public DelayQueue()
// 创建延迟队列,初始包含给定集合c
public DelayQueue(Collection<? extends E> c)
this.addAll(c);
3)元素操作方法
- boolean add(E e) return offer(e);: 插入元素e到延迟队列,同 offer(E e ) 方法;
- boolean offer(E e): 插入元素e到延迟队列;元素e为null报空指针异常;底层调用了 PriorityQueue.offer(e) 方法;
- void put(E e) offer(e); : 同offer(e);
- boolean offer(E e, long timeout, TimeUnit unit) return offer(e);: 插入元素,但永不阻塞,因为底层是优先级队列(无界);
- E poll(): 获取头部元素,若不存在没有延迟到期的元素,则返回 null。底层调用 优先级队列.poll() 方法;
- E take() throws InterruptedException: 获取头部元素,按需等待直到队列存在可用的延迟到期的元素;底层调用 优先级队列.poll() 方法;
- E poll(long timeout, TimeUnit unit) throws InterruptedException: 获取头部元素;如为空,则等待给定时间;若等待结束前扔不存在可用的延迟到期元素,则返回null;
- E peek(): 获取但不移除头部元素,若队列空,则返回null;与pool不同,若队列不存在到期元素,则返回下一个将要到期元素而不是 null,若存在下一个的话;
【2.5】SynchronousQueue 同步队列
0)类描述
一个阻塞队列,其中每个插入操作都必须等待另一个线程执行相应的移除操作,反之亦然。同步队列没有任何内部容量。
- 您无法查看同步队列,因为元素仅在您尝试删除它时才存在;
- 你不能插入一个元素(使用任何方法),除非另一个线程试图删除它;
- 你不能迭代,因为没有什么可以迭代的。
队列头部是第一个排队的插入线程试图添加到队列的元素;
如果没有这样的排队线程,则没有元素可用于删除,poll() 将返回 null。对于其他集合方法(例如包含),SynchronousQueue 充当空集合。此队列不允许空元素。
同步队列类似于 CSP 和 Ada 中使用的集合通道。
它们非常适合切换设计,在这种设计中,在一个线程中运行的对象必须与在另一个线程中运行的对象同步,以便将某些信息、事件或任务交给它。
该类支持可选的公平策略,用于对等待的生产者线程和消费者线程进行排序。默认情况下,不保证此顺序。但是,公平性设置为 true 的队列会以 FIFO 顺序授予线程访问权限(公平性,谁最先等待,谁最先操作)。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
1)类定义
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
2)构造器
public SynchronousQueue()
this(false);
// 若公平性fair设置为true,则等待线程根据 FIFO 顺序竞争访问
public SynchronousQueue(boolean fair)
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
3)方法描述
- void put(E e) throws InterruptedException: 把元素e添加到队列,必要时等待阻塞直到线程B接收它;元素为空抛空指针异常;等待阻塞可能抛中断异常;
- boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException: 把元素e添加到队列,必要时至多等待给定时间直到另一个线程接收它。若插入成功返回true;若消费者出现之前指定的等待时间已过,则返回false;
- boolean offer(E e):插入元素e到队列,如果正好有其他线程接收它;插入成功返回true,否则false;
- E take() throws InterruptedException: 查询并移除队列头部元素,必要时等待另一线程插入元素;
- E poll(long timeout, TimeUnit unit) throws InterruptedException: 检索并删除此队列的头部,必要时等待指定时长,以便另一个线程插入它。返回队列头部,或者在元素出现之前经过了指定的等待时间,返回 null ;
- E poll():若另一线程正在使元素可用,则检索并删除队列头部。返回队列头部,或者队列没有可用元素,返回null;
- E peek() return null; :总是返回null;
【2.6】LinkedTransferQueue 链表传输队列
0)类描述
基于链接节点的无界 TransferQueue。
该队列根据任何给定的生产者对元素 FIFO(先进先出)进行排序。队列的头部是某个生产者在队列中停留时间最长的元素。队列的尾部是某个生产者在队列中停留时间最短的那个元素。
请注意,与大多数集合不同,size 方法不是恒定时间操作。
由于这些队列的异步特性,确定当前元素的数量需要遍历所有元素,因此如果在遍历期间修改此集合,则可能会报告不准确的结果。
此外,批量操作 addAll、removeAll、retainAll、containsAll、equals 和 toArray 不能保证以原子方式执行。例如,与 addAll 操作同时运行的迭代器可能只查看一些添加的元素。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
与其他并发集合一样,线程中的操作在将对象放入LinkedTransferQueue之前发生——在另一个线程中的LinkedTransferQueue中访问或删除该元素之后发生操作。
1)类定义
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
2)构造器
// 创建空链表传输队列
public LinkedTransferQueue()
// 创建包含给定集合c的链表传输队列
public LinkedTransferQueue(Collection<? extends E> c)
this();
addAll(c);
3)元素操作方法
/*
* Possible values for "how" argument in xfer method.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
- void put(E e) xfer(e, true, ASYNC, 0); : 插入元素e到队列;因为队列无界,所以永不阻塞;但元素空,抛空指针;
- boolean offer(E e): 插入元素e到队列;因为队列无界,所以只会返回true或者元素e为null抛空指针;
- boolean offer(E e, long timeout, TimeUnit unit) : 同 offer(E),timeout 和 unit 参数没有使用;
- boolean add(E e): 底层实现同 offer(E e );
- E take() throws InterruptedException: 获取元素;获取到null抛出异常;
- E poll(long timeout, TimeUnit unit) throws InterruptedException: 获取元素,最多等待给定时间;获取到null抛出异常;
- E poll() return xfer(null, false, NOW, 0);: 获取元素,元素为null不抛出异常;
- E peek() return firstDataItem(); :获取第一个元素但不出队;队列空返回null,不抛出异常;
【2.7】LinkedBlockingDeque 链表阻塞双端队列(注意是双端)
0)类描述
基于链表的可选有界阻塞双端队列。
可选的容量提供了防止队列过度扩展的方法。
如果未指定,容量等于 Integer.MAX_VALUE(默认)。 链接节点在每次插入时动态创建,除非这会使双端队列超出容量。
大多数操作在恒定时间内运行(忽略阻塞所花费的时间)。 但 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 和批量操作,所有这些都在线性时间内运行。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。
1)类定义
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable
2)构造器
// 默认容量的构造器
public LinkedBlockingDeque()
this(Integer.MAX_VALUE);
// 给定容量的构造器
public LinkedBlockingDeque(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 创建一个包含 指定容器中所有元素 的双端队列
public LinkedBlockingDeque(Collection<? extends E> c)
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try
for (E e : c)
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
finally
lock.unlock();
底层使用了 Node节点,包括 上一个 下一个指针;
static final class Node<E>
E item;// 节点值
// 上一个节点
Node<E> prev;
// 下一个节点
Node<E> next;
// 节点构造器
Node(E x)
item = x;
3)元素操作方法
- void addFirst(E e): 头部添加元素,底层调用offerFirst(e),队列满则抛出异常;
- void addLast(E e):操作尾部,其余同上;
- boolean offerFirst(E e) :头部添加元素; 底层调用 linkFirst(node);元素为null,抛出异常;
- boolean offerLast(E e):操作尾部,其余同上;
- void putFirst(E e) throws InterruptedException:头部添加元素; 底层调用 linkFirst(node);队列满则阻塞等待;
- void putLast(E e) throws InterruptedException:操作尾部,其余同上;
- boolean offerFirst(E e, long timeout, TimeUnit unit):头部添加元素; 底层调用 linkFirst(node);队列满则阻塞等待;但阻塞时间不超过timeout;
- boolean offerLast(E e, long timeout, TimeUnit unit):操作尾部,其余同上;
- E pollFirst():获取头部元素,底层调用 unlinkFirst();队列为空返回null;
- E pollLast():操作尾部,其余同上;
- E takeFirst() throws InterruptedException: 获取头部元素,底层调用 unlinkFirst(),队列空则阻塞;
- E takeLast() throws InterruptedException: 操作尾部,其余同上;
- E pollFirst(long timeout, TimeUnit unit): 获取头部元素,底层调用 unlinkFirst();队列为空则阻塞等待给定时间;
- E pollLast(long timeout, TimeUnit unit):操作尾部,其余同上;
- E getFirst():获取首部元素,底层调用 peekFirst(),但不出队;不阻塞;
- E getLast(): 操作尾部,其余同上;
- E peekFirst():获取首部元素,不出队,不阻塞;为空返回null;
- E peekLast() :操作尾部,其余同上;
以上是关于java阻塞队列小结的主要内容,如果未能解决你的问题,请参考以下文章