使用 LinkedBlockingQueue 实现简易版线程池
Posted gkmeteor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 LinkedBlockingQueue 实现简易版线程池相关的知识,希望对你有一定的参考价值。
前一阵子在做联系人的导入功能,使用POI组件解析Excel文件后获取到联系人列表,校验之后批量导入。单从技术层面来说,导入操作通常情况下是一个比较耗时的操作,而且如果联系人达到几万、几十万级别,必须拆分成为子任务来执行。综上,可以使用线程池来解决问题。技术选型上,没有采用已有的 ThreadPoolExecutor 框架,而使用了自制的简易版线程池。该简易版的线程池,其实也是一个简易版的【生产者-消费者】模型,任务的加入就像是生产的过程,任务的处理就像是消费的过程。我们在这里不去讨论方案的合理性,只是从技术层面总结一下在实现简易版线程池的过程中,我所学到的知识。
代码放在Github上,分享一下:https://github.com/Julius-Liu/threadpool
一、线程池设计
我们首先使用数组 ArrayList 来作为线程池的存储结构,例如数组大小为10就意味着这是一个大小为10的线程池。然后我们使用 LinkedBlockingQueue(链式阻塞队列)来存放线程的参数。示意图如下:
当线程池里的线程初始化完成后,我们希望线程都处于【饥饿】状态,随时等待参数传入,然后执行。所以,此时线程应该处于阻塞状态,如下图所示:
当我们将一个执行任务(一个参数)交给线程池以后,线程池会安排一个线程接收参数,这个线程会进入运行状态。线程执行完以后,线程又会因为参数队列为空而进入阻塞状态。某线程的执行状态如下图所示,执行完的阻塞态,如上图所示。
假设线程池中有3个线程,我们连续扔了3个参数给线程池,线程池会轮询获取线程,将参数塞给他们,然后这些线程会进入运行状态。运行完成后回归阻塞状态。如下图所示:
如下图所示,假设线程池中只有3个线程,我们连续发8个参数给线程池,那么池会轮流分配参数。线程在收到参数后就会执行。“消耗”掉一个参数后,会继续消耗下一个参数,直到参数列表为空为止。
二、为什么使用 LinkedBlockingQueue
1. BlockingQueue
我们必须先来说说为什么使用阻塞队列 BlockingQueue。BlockingQueue 队列为空时,尝试获取队头元素的操作会阻塞,一直等到队列中有元素时再返回。这个阻塞的特性,正是我们需要的,我们可以让线程一直等待元素插入,一旦插入立即执行。BlockingQueue 也支持在添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。如此一来,我们交给线程池的任务就不会丢失,哪怕超过了队列的容量。
所以我们定下方案,采用阻塞队列来作为数据结构,然后我们来调研阻塞队列常用的5种实现,看看选择哪种实现来完成线程池。
2. ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法。可以说 ArrayBlockingQueue 是 阻塞队列的最直观的实现。
3. DelayQueue
DelayQueue是一个无界阻塞队列,延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。
DelayQueue阻塞队列在我们系统开发中也常常会用到,例如缓存系统的设计。缓存中的对象,超过了空闲时间,需要从缓存中移出;例如任务调度系统,需要准确的把握任务的执行时间。我们可能需要通过线程处理很多时间上要求很严格的数据,如果使用普通的线程,我们就需要遍历所有的对象,一个个检查看数据是否过期。首先这样在执行上的效率不会太高,其次就是这种设计的风格也大大的影响了数据的精度。一个需要12:00点执行的任务可能12:01 才执行,这样对数据要求很高的系统有更大的弊端。使用 DelayQueue 可以做到精准触发。
由上可知,延迟队列不是我们需要的阻塞队列实现。
4. LinkedBlockingQueue
LinkedBlockingQueue是一个由链表实现的有界队列阻塞队列,但大小默认值为Integer.MAX_VALUE,也可以在初始化的时候指定 capacity。和 ArrayBlockingQueue 一样,其中put方法和take方法为添加和删除的阻塞方法。
5. PriorityBlockingQueue
优先级阻塞队列通过使用堆这种数据结构实现将队列中的元素按照某种排序规则进行排序,从而改变先进先出的队列顺序,提供开发者改变队列中元素的顺序的能力。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。不可以放null,会报空指针异常,也不可放置无法比较的元素;add方法添加元素时,是自下而上的调整堆,取出元素时,是自上而下的调整堆顺序。
我们放入参数队列中的参数都是平级的,不涉及优先级,因此我们不考虑优先级阻塞队列。
6. SynchronousQueue
同步队列实际上不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。同步队列是轻量级的,不具有任何内部容量,我们可以用来在线程间安全的交换单一元素。
因为同步队列没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
应用场景,我们来看一下Java并发包里的 newCachedThreadPool 方法:
1 package java.util.concurrent; 2 3 /** 4 * 带有缓存的线程池 5 */ 6 public static ExecutorService newCachedThreadPool() { 7 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 8 60L, TimeUnit.SECONDS, 9 new SynchronousQueue<Runnable>()); 10 }
Executors.newCachedThreadPool() 方法返回的 ThreadPoolExecutor 实例,其内部的阻塞队列使用的就是同步队列。由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列的非阻塞式入队列方法(offer方法),因此,在使用同步队列作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从同步队列队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
如上所述,同步队列没有内部容量来存放参数,因此我们不选择同步队列。
7. 阻塞队列选择
研究了阻塞队列的5中实现以后,候选者就在 ArrayBlockingQueue 和 LinkedBlockingQueue 两者中。其实要实现本文的简易版线程池,使用数组阻塞队列和链接阻塞队列都可以,如果你要考虑一些极端情况下的性能问题,那么透彻的研究两者的使用场景就非常有必要。数组阻塞队列和链接阻塞队列的成员变量和方法都很相似,相同点我们就先不说了。下面我们来看看两者的不同点:
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE)。对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
- 实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReentrantLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
三、LinkedBlockingQueue 底层方法
我们来调研一下 LinkedBlockingQueue,看看哪些变量和方法可以使用。
先来看一下 LinkedBlockingQueue 的数据结构,有一个直观的了解:
说明:
- LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
- LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
- LinkedBlockingQueue是通过单链表实现的。
-
- head是链表的表头。取出数据时,都是从表头head处获取。
- last是链表的表尾。新增数据时,都是从表尾last处插入。
- count是链表的实际大小,即当前链表中包含的节点个数。
- capacity是列表的容量,它是在创建链表时指定的。
- putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。
我们来看一下 LinkedBlockingQueue 常用的变量:
1 // 容量 2 private final int capacity; 3 4 // 当前数量 5 private final AtomicInteger count = new AtomicInteger(0); 6 7 // 链表的表头 8 transient Node<E> head; 9 10 // 链表的表尾 11 private transient Node<E> last; 12 13 // 用于控制删除元素的【取出锁】和锁对应的【非空条件】 14 private final ReentrantLock takeLock = new ReentrantLock(); 15 private final Condition notEmpty = takeLock.newCondition(); 16 17 // 用于控制添加元素的【插入锁】和锁对应的【非满条件】 18 private final ReentrantLock putLock = new ReentrantLock(); 19 private final Condition notFull = putLock.newCondition();
这里的两把锁,takeLock 和 putLock,和两个条件,notEmpty 和 notFull 是我们考察的重点。
LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁
- 对于插入操作,通过 putLock(插入锁)进行同步
- 对于取出操作,通过 takeLock(取出锁)进行同步
此外,插入锁putLock和notFull(非满条件)相关联,取出锁takeLock和notEmpty(非空条件)相关联。通过notFull条件和notEmpty条件更细腻的控制putLock 和 takeLock。
举例说明,若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取数据前,会获取takeLock,在取数据执行完毕再释放takeLock。
若某线程(线程H)要插入数据时(put操作),队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。
LinkedBlockingQueue 常用函数
1 // 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 2 LinkedBlockingQueue() 3 4 // 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加 5 LinkedBlockingQueue(Collection<? extends E> c) 6 7 // 创建一个具有给定(固定)容量的 LinkedBlockingQueue 8 LinkedBlockingQueue(int capacity) 9 10 // 从队列彻底移除所有元素 11 void clear() 12 13 // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false 14 boolean offer(E e) 15 16 // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用 17 boolean offer(E e, long timeout, TimeUnit unit) 18 19 // 获取但不移除此队列的头;如果此队列为空,则返回 null 20 E peek() 21 22 // 获取并移除此队列的头,如果此队列为空,则返回 null 23 E poll() 24 25 // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要) 26 E poll(long timeout, TimeUnit unit) 27 28 // 将指定元素插入到此队列的尾部,如有队列满,则等待空间变得可用 29 void put(E e) 30 31 // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量 32 int remainingCapacity() 33 34 // 从此队列移除指定元素的单个实例(如果存在) 35 boolean remove(Object o) 36 37 // 返回队列中的元素个数 38 int size() 39 40 // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要) 41 E take()
我们看到 offer(E e) 和 put(E e) 都是往队尾插入元素,poll() 和 take() 都是取出队头的元素,但是它们之间还是有细微的差别,我们接下来重点看看这4个方法的源码。
下面来看一下 offer(E e) 的源码:
1 /** 2 * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) 3 * 在成功时返回 true,如果此队列已满,则返回 false 4 * 如果使用了有容量限制的队列,推荐使用add方法,add方法在失败的时候只是抛出异常 5 */ 6 public boolean offer(E e) { 7 if (e == null) throw new NullPointerException(); 8 final AtomicInteger count = this.count; 9 if (count.get() == capacity) 10 // 如果队列已满,则返回false,表示插入失败 11 return false; 12 int c = -1; 13 Node<E> node = new Node<E>(e); 14 final ReentrantLock putLock = this.putLock; 15 // 获取 putLock 16 putLock.lock(); 17 try { 18 // 再次对【队列是不是满】的进行判断,如果不是满的,则插入节点 19 if (count.get() < capacity) { 20 enqueue(node); // 在队尾插入节点 21 c = count.getAndIncrement(); // 当前节点数量+1,并返回插入之前节点数量 22 if (c + 1 < capacity) 23 // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程 24 notFull.signal(); 25 } 26 } finally { 27 // 释放 putLock 28 putLock.unlock(); 29 } 30 if (c == 0) 31 // 如果在插入节点前,队列为空,那么插入节点后,唤醒notEmpty上的等待线程 32 signalNotEmpty(); 33 return c >= 0; 34 }
下面来看看 put(E e) 的源码:
1 /** 2 * 将指定元素插入到此队列的尾部,如有队列满,则等待空间变得可用 3 * 4 * @throws InterruptedException {@inheritDoc} 5 * @throws NullPointerException {@inheritDoc} 6 */ 7 public void put(E e) throws InterruptedException { 8 if (e == null) throw new NullPointerException(); 9 10 int c = -1; 11 Node<E> node = new Node<E>(e); 12 final ReentrantLock putLock = this.putLock; 13 final AtomicInteger count = this.count; 14 putLock.lockInterruptibly(); // 可中断地获取 putLock 15 try { 16 // count 变量是被 putLock 和 takeLock 保护起来的,所以可以真实反映队列当前的容量情况 17 while (count.get() == capacity) { 18 notFull.await(); 19 } 20 enqueue(node); // 在队尾插入节点 21 c = count.getAndIncrement(); // 当前节点数量+1,并返回插入之前节点数量 22 if (c + 1 < capacity) 23 // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程 24 notFull.signal(); 25 } finally { 26 putLock.unlock(); // 释放 putLock 27 } 28 if (c == 0) 29 // 如果在插入节点前,队列为空,那么插入节点后,唤醒notEmpty上的等待线程 30 signalNotEmpty(); 31 }
两者都用到了 signalNotEmpty(),下面来看一下源码:
1 /** 2 * 通知一个等待的take。该方法应该仅仅从put/offer调用,否则一般很难锁住takeLock 3 */ 4 private void signalNotEmpty() { 5 final ReentrantLock takeLock = this.takeLock; 6 takeLock.lock(); // 获取 takeLock 7 try { 8 notEmpty.signal(); // 唤醒notEmpty上的等待线程,意味着现在可以获取元素了 9 } finally { 10 takeLock.unlock(); // 释放 takeLock 11 } 12 }
下面来看看 poll() 的源码:
1 /** 2 * 获取并移除此队列的头,如果此队列为空,则返回 null 3 */ 4 public E poll() { 5 final AtomicInteger count = this.count; 6 if (count.get() == 0) 7 return null; 8 E x = null; 9 int c = -1; 10 final ReentrantLock takeLock = this.takeLock; 11 takeLock.lock(); // 获取 takeLock 12 try { 13 if (count.get() > 0) { 14 x = dequeue(); // 获取队头元素,并移除 15 c = count.getAndDecrement(); // 当前节点数量-1,并返回移除之前节点数量 16 if (c > 1) 17 // 如果在移除元素之后,队列中仍然有元素,则唤醒notEmpty上的等待线程 18 notEmpty.signal(); 19 } 20 } finally { 21 takeLock.unlock(); // 释放 takeLock 22 } 23 if (c == capacity) 24 // 如果在移除节点前,队列是满的,那么移除节点后,唤醒notFull上的等待线程 25 signalNotFull(); 26 return x; 27 }
下面来看看 take() 的源码:
1 /** 2 * 取出并返回队列的头。若队列为空,则一直等待 3 */ 4 public E take() throws InterruptedException { 5 E x; 6 int c = -1; 7 final AtomicInteger count = this.count; 8 final ReentrantLock takeLock = this.takeLock; 9 // 获取 takeLock,若当前线程是中断状态,则抛出InterruptedException异常 10 takeLock.lockInterruptibly(); 11 try { 12 // 若队列为空,则一直等待 13 while (count.get() == 0) { 14 notEmpty.await(); 15 } 16 x = dequeue(); // 从队头取出元素 17 c = count.getAndDecrement(); // 取出元素之后,节点数量-1;并返回移除之前的节点数量 18 if (c > 1) 19 // 如果在移除元素之后,队列中仍然有元素,则唤醒notEmpty上的等待线程 20 notEmpty.signal(); 21 } finally { 22 takeLock.unlock(); // 释放 takeLock 23 } 24 25 if (c == capacity) 26 // 如果在取出元素之前,队列是满的,就在取出元素之后,唤醒notFull上的等待线程 27 signalNotFull(); 28 return x; 29 }
两者都用到了signalNotFull(),signalNotFull()的源码如下:
1 /** 2 * 唤醒notFull上的等待线程,只能从 poll 或 take 调用 3 */ 4 private void signalNotFull() { 5 final ReentrantLock putLock = this.putLock; 6 putLock.lock(); // putLock 上锁 7 try { 8 notFull.signal(); // 唤醒notFull上的等待线程,意味着可以插入元素了 9 } finally { 10 putLock.unlock(); // putLock 解锁 11 } 12 }
从上面的4个常用函数来看,我们想要在队列为空的时候,将获取这个动作阻塞,因此我们选择【take方法】而不是【poll方法】。值得注意的是带有参数的poll方法可以更精细地控制当队列为空时,获取动作阻塞多久。在本文中我们不考虑这种做法,直接让获取操作在 notEmpty 上等待。对于插入操作,我们采用【offer方法】而不是【put方法】,前者在队列满的时候返回false,后者在队列满的时候会在 notFull 上等待。在本文中,我们把线程池做的简单一些,如果队列满就提示重试。
四、简易版线程池代码实现
具备了 LinkedBlockingQueue 的底层代码解读以后,我们来实现简易版线程池。
其实在简易版线程池初期,由于对 LinkedBlockingQueue 的底层方法不熟悉,因此对线程手动 wait 和上锁。具体来说,根据队列size的情况来决定线程是否要进入wait方法,然后在插入参数的时候,使用 synchronized 关键字锁住整个队列,再offer。这种做法,完全没有考虑已有的 takeLock,putLock,notEmpty条件和notFull条件。所以后来仔细研究了链接阻塞队列的特性,修改了线程池的实现,算是做了正确的事。
1. 注册成为 Spring Bean
我们希望在Springboot 程序启动的时候,将线程池初始化。可以使用 Spring 提供的 InitializingBean 接口的 afterPropertiesSet 方法,在所有基础属性初始化完成后,进行线程池的初始化。
1 package cn.com.gkmeteor.threadpool.utils; 2 3 @Component 4 public class ThreadPoolUtil implements InitializingBean { 5 6 public static int POOL_SIZE = 10; 7 8 @Autowired 9 private ThreadExecutorService threadExecutorService; // 具体的线程处理类 10 11 private List<ThreadWithQueue> threadpool = new ArrayList<>(); 12 13 /** 14 * 在所有基础属性初始化完成后,初始化当前类 15 * 16 * @throws Exception 17 */ 18 @Override 19 public void afterPropertiesSet() throws Exception { 20 for (int i = 0; i < POOL_SIZE; i++) { 21 ThreadWithQueue threadWithQueue = new ThreadWithQueue(i, threadExecutorService); 22 this.threadpool.add(threadWithQueue); 23 } 24 } 25 }
2. 轮询获取一个线程
我们希望将任务轮流分给线程池中的线程。要实现这个比较简单,直接两行代码搞定。
1 public static int POOL_SIZE = 10; // 线程池容量 2 index = (++index) % POOL_SIZE; // index 是当前选中的线程下标
3. 参数入队和出队,线程运行和阻塞
主要使用 queue.offer(param) 和 String param = queue.take() 这两个方法,具体来看下面的代码:
1 package cn.com.gkmeteor.threadpool.utils; 2 3 import cn.com.gkmeteor.threadpool.service.ThreadExecutorService; 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import java.util.concurrent.BlockingQueue; 8 9 /** 10 * 带有【参数阻塞队列】的线程 11 */ 12 public class ThreadWithQueue extends Thread { 13 14 public static int CAPACITY = 10; 15 16 private Logger logger = LoggerFactory.getLogger(ThreadWithQueue.class); 17 18 private BlockingQueue<String> queue; 19 20 private ThreadExecutorService threadExecutorService; // 线程运行后的业务逻辑处理 21 22 private String threadName; 23 24 public String getThreadName() { 25 return threadName; 26 } 27 28 public void setThreadName(String threadName) { 29 this.threadName = threadName; 30 } 31 32 /** 33 * 构造方法 34 * 35 * @param i 第几个线程 36 * @param threadExecutorService 线程运行后的业务逻辑处理 37 */ 38 public ThreadWithQueue(int i, ThreadExecutorService threadExecutorService) { 39 queue = new java.util.concurrent.LinkedBlockingQueue<>(CAPACITY); 40 threadName = "Thread(" + i + ")"; 41 42 this.threadExecutorService = threadExecutorService; 43 44 this.start(); 45 } 46 47 /** 48 * 将参数放到线程的参数队列中 49 * 50 * @param param 参数 51 * @return 52 */ 53 ArrayBlockingQueue与LinkedBlockingQueue对比11.并发包阻塞队列之LinkedBlockingQueue
Java Review - 并发编程_LinkedBlockingQueue原理&源码剖析
Java Review - 并发编程_LinkedBlockingQueue原理&源码剖析