java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析相关的知识,希望对你有一定的参考价值。


BlockingQueue是阻塞队列接口类,该接口继承了Queue接口

BlockingQueue实现类常见的有以下几种。

  1. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。


  2. DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。


  3. LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限


  4. PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现


  5. SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。


BlockingQueue接口提供了

3个添加元素方法

  1. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常

  2. offer:添加元素到队列里,添加成功返回true,添加失败返回false

  3. put:添加元素到队列里,如果容量满了会阻塞直到容量不满

3个删除方法。

  1. poll:删除队列头部元素,如果队列为空,返回null。否则返回元素。

  2. remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false

  3. take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除



例子:生产者和消费者非常适合阻塞队列,其实我也弄过redis作为生产者和消费者模式,redis的list非常适合做队列,生产者放入队列和消费者从队列里取出,同时也提供阻塞的取出等。先回归到java的阻塞队列里,用LinkedBlockingQueue来做这个例子。

package com.basic.test;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by sdc on 2017/6/9.
 */
public class BlockingQueueTest {

    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
        Producter producer = new Producter(blockingQueue);
        Consumer consumer = new Consumer(blockingQueue);

        //创建5个生产者,5个消费者
        for (int i = 0; i < 10; i++) {
            if (i < 5) {
                new Thread(producer, "producer" + i).start();
            } else {
                new Thread(consumer, "consumer" + (i - 5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        producer.shutDown();
        consumer.shutDown();
    }


    static class Producter implements Runnable {

        private final BlockingQueue<Integer> blockingQueue;

        private volatile boolean flag;

        private Random random;

        public Producter(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag = false;
            random = new Random();

        }

        @Override
        public void run() {
            while (!flag) {
                int info = random.nextInt(100);
                try {
                    blockingQueue.put(info);
                    System.out.println(Thread.currentThread().getName() + "produce" + info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void shutDown() {
            flag = true;
        }
    }

    static class Consumer implements Runnable {

        private final BlockingQueue<Integer> blockingQueue;

        private volatile boolean flag;

        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (!flag) {
                int info;
                try {
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + " consumer " + info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void shutDown() {
            flag = true;
        }
    }

}

LinkedBlockingQueue

LinkedBlockingQueue队列是一个使用链表完成的阻塞队列,链表是单向的。

内部用了两个锁,takeLock,putLock,添加数据和删除数据都是并行执行的,当然添加数据和删除数据的时候只能有1个线程各自执行。


// 容量大小
private final int capacity;

// 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger
private final AtomicInteger count = new AtomicInteger(0);

// 头结点
private transient Node<E> head;

// 尾节点
private transient Node<E> last;

// 拿锁
private final ReentrantLock takeLock = new ReentrantLock();

// 拿锁的条件对象
private final Condition notEmpty = takeLock.newCondition();

// 放锁
private final ReentrantLock putLock = new ReentrantLock();

// 放锁的条件对象
private final Condition notFull = putLock.newCondition();


LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

offer与put都是添加元素到queue的尾部, 只不过 put 方法在队列满时会进行阻塞, 直到成功; 
而 offer 操作在容量满时直接返回 false.

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 如果容量满了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量没满,以新元素构造节点
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程
    try {
        if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行
            enqueue(node); // 节点添加到链表尾部
            c = count.getAndIncrement(); // 元素个数+1
            if (c + 1 < capacity) // 如果容量还没满
                notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
        }
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法
    }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
    return c >= 0; // 添加成功返回true,否则返回false
}
put元素是将元素添加到队列尾部,queue满时进行await,添加成功后容量还未满,则进行signal.
代码的注释中基本把操作思想都说了, 有几个注意的地方
当queue满时, 会调用 notFull.await() 进行等待, 而相应的唤醒的地方有两处, 一个是 "有线程进行
put/offer 成功后且 (c + 1) < capacity 时", 另一处是 "在线程进行 take/poll 成功 且 
(c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量)"代码中的 "signalNotEmpty" 这时在原来queue的数量 c (getAndIncrement的返回值是原来的值) 
==0 时对此时在调用 take/poll 方法的线程进行唤醒。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素构造节点
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程
    try {
        while (count.get() == capacity) { // 如果容量满了
            notFull.await(); // 阻塞并挂起当前线程
        }
        enqueue(node); // 节点添加到链表尾部
        c = count.getAndIncrement(); // 元素个数+1
        if (c + 1 < capacity) // 如果容量还没满
            notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用put方法
    }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 如果元素个数为0
        return null; // 返回null
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程
    try {
        if (count.get() > 0) { // 判断队列里是否还有数据
            x = dequeue(); // 删除头结点
            c = count.getAndDecrement(); // 元素个数-1
            if (c > 1) // 如果队列里还有元素
                notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
        }
    } finally {
        takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法
    }
    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
        signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
                return x;
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程
    try {
        while (count.get() == 0) { // 如果队列里已经没有元素了
            notEmpty.await(); // 阻塞并挂起当前线程
        }
        x = dequeue(); // 删除头结点
        c = count.getAndDecrement(); // 元素个数-1
        if (c > 1) // 如果队列里还有元素
            notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
    } finally {
        takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法
    }
    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
        signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
    return x;
}

poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll
则直接返回
public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁
    try {
        for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判断是否找到对象
                unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2个锁解锁
    }
}


ArrayBlockingQueue


ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。


ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。


它带有的属性如下:

// 存储队列元素的数组,是个循环数组
final Object[] items;

// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;

// 放数据的索引,用于put,offer,add方法
int putIndex;

// 元素个数
int count;

// 可重入锁
final ReentrantLock lock;
// notEmpty条件对象,由lock创建
private final Condition notEmpty;
// notFull条件对象,由lock创建
private final Condition notFull;
数据的添加


ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。


add方法:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

add方法内部调用offer方法如下:

public boolean offer(E e) {
    checkNotNull(e); // 不允许元素为空
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程
    try {
        if (count == items.length) // 如果队列已满
            return false; // 直接返回false,添加失败
        else {
            insert(e); // 数组没满的话调用insert方法
            return true; // 返回true,添加成功
        }
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用offer方法
    }
}


insert方法如下:

private void insert(E x) {
    items[putIndex] = x; // 元素添加到数组里
    putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
    ++count; // 元素个数+1
    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}

put方法:

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不允许元素为空
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
    try {
        while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
            notFull.await(); // 线程阻塞并被挂起,同时释放锁
        insert(e); // 调用insert方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用put方法
    }
}


ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:


add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true


offer方法如果队列满了,返回false,否则返回true


add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。


这3个方法内部都会使用可重入锁保证原子性。


数据的删除


ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。


poll方法:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程
    try {
        return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用poll方法
    }
}

poll方法内部调用extract方法:

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
    items[takeIndex] = null; // 对应取索引上的数据清空
    takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
    --count; // 元素个数-1
    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
    return x; // 返回元素
}

take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
    try {
        while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
            notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
        return extract(); // 调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用take方法
    }
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素
            if (o.equals(items[i])) { // 两个对象相等的话
                removeAt(i); // 调用removeAt方法
                return true; // 删除成功,返回true
            }
        }
        return false; // 删除成功,返回false
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用remove方法
    }
}

removeAt方法:

void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
   --count; // 元素个数-1
   notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满
   ,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
}

  

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:


poll方法对于队列为空的情况,返回null,否则返回队列头部元素。


remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。


poll方法和remove方法不会阻塞线程。


take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。


这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。



阻塞队列常用的着两个队列都即使这样的,所以就这么个事情。


参考博文:

http://fangjian0423.github.io/2016/05/10/java-arrayblockingqueue-linkedblockingqueue-analysis/


以后会陆续补上其他的方法。


本文出自 “10093778” 博客,请务必保留此出处http://10103778.blog.51cto.com/10093778/1934068

以上是关于java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析的主要内容,如果未能解决你的问题,请参考以下文章

面试刷题21:java并发工具中的队列有哪些?

java调优设置阻塞队列大小

java阻塞队列 线程同步合作

Java 阻塞队列

Java基础之线程阻塞队列

java - 阻塞队列