jdk源码解析--DelayQueue类
Posted 我的IT技术路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk源码解析--DelayQueue类相关的知识,希望对你有一定的参考价值。
在之前我们介绍了两种阻塞队列,ArrayBlockingQueue和LinkedBlockingQueue
,本节我们要介绍一种具有特殊功能的阻塞队列,延迟队列DelayQueue。使用延迟队列需要注意一点:延迟队列是一个优先级队列,只有优先级高的执行完成才能轮到优先级低的完成。我们先通过一个demo看下怎么使用,demo中需要做一个加入后5秒的时间才能执行的任务:
1. import java.util.Calendar;
2. import java.util.Random;
3. import java.util.concurrent.DelayQueue;
4. import java.util.concurrent.Delayed;
5. import java.util.concurrent.TimeUnit;
6. import java.util.stream.IntStream;
7.
8. public class DelayQueueTest {
9. public static void main(String[] args) throws InterruptedException {
10. DelayQueue deque = new DelayQueue();//构建这个优先级队列
11. Random random = new Random();
12. IntStream.range(0,2).forEach(i-> new Thread(new Producer(deque,String.valueOf(random.nextInt(10)) )).start());
13. Thread.sleep(100);//保证10个数据已经添加到队列中
14. new Thread(new Consumer(deque)).start();//先启动一个消费者
15. }
16. //创建延迟任务
17. private static class DelayTask implements Delayed{
18. private static final long DELAY_TIME = 5000L;
19. private long addTime;
20. private String key;
21. public DelayTask(long t,String k){
22. this.addTime = t;
23. this.key = k;
24. }
25.
26. public long getAddTime() {
27. return addTime;
28. }
29.
30. public void setAddTime(long addTime) {
31. this.addTime = addTime;
32. }
33.
34. public String getKey() {
35. return key;
36. }
37.
38. public void setKey(String key) {
39. this.key = key;
40. }
41. //当getDelay<=0 的时候,就证明该任务可以执行
42. @Override
43. public long getDelay(TimeUnit unit) {
44. return unit.convert(addTime+5000-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
45. }
46. //维持着队列的顺序的比较方法
47. @Override
48. public int compareTo(Delayed o) {
49. DelayTask task =(DelayTask)o;
50. return this.key.compareTo(task.key);
51. }
52. }
53. private static class Producer implements Runnable{
54. private DelayQueue deque;
55. private String value ;
56. public Producer(DelayQueue deque,String value) {
57. this.deque = deque;
58. this.value = value;
59. }
60. @Override
61. public void run() {
62. int i =0;
63. while (i<5){
64.
65. if (deque.add(new DelayTask(System.currentTimeMillis()+(i*1000L),String.valueOf(Integer.valueOf(value)*(i+1))))){
66. System.out.println("producer "+Calendar.getInstance().getTime()+" add "+ Integer.valueOf(value)*(i+1)+" succ");
67. i++;
68. }
69. }
70.
71. }
72. }
73. private static class Consumer implements Runnable{
74. private DelayQueue deque;
75. public Consumer(DelayQueue deque) {
76. this.deque = deque;
77. }
78. @Override
79. public void run() {
80. while (true){
81. DelayTask delayTask = null;
82. try {
83. delayTask = (DelayTask) deque.take();//带有阻塞的获取
84. } catch (InterruptedException e) {
85. e.printStackTrace();
86. }
87. System.out.println("consumer "+ Calendar.getInstance().getTime() +" :"+delayTask.getKey());
88. if (deque.isEmpty()){
89. break;
90. }
91. }
92. }
93. }
94. }
看下执行的结果:
基本上隔了7秒的时间,46秒的时候全部放入,第一个开始消费的是53秒,因为上面在添加的时候加了延迟时间,延迟时间并不是看到的5秒中。从上面的也可以看到延迟队列是先按照顺序,然后按照等待时间进行处理的。
下面我们看下这个延迟队列的具体实现细节。
1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
2. implements BlockingQueue<E> {//注意E是继承Delayed
3. private final transient ReentrantLock lock = new ReentrantLock();//锁对戏
4. private final PriorityQueue<E> q = new PriorityQueue<E>();//底层的优先级队列
5. private Thread leader = null;//等待头部节点的线程,只有头部节点可以获取之后,后面的才能执行
6. private final Condition available = lock.newCondition();//已经可用的条件变量
7. public DelayQueue() {}
8. public DelayQueue(Collection<? extends E> c) {
9. this.addAll(c);
10. }
11. //核心方法
12. public void put(E e) {
13. offer(e);
14. }
15. public boolean offer(E e) {
16. final ReentrantLock lock = this.lock;
17. lock.lock();//加锁
18. try {
19. q.offer(e);//调用优先级队列的添加
20. if (q.peek() == e) {//如果获取到第一个元素时e的话
21. leader = null;//置空leader
22. available.signal();//唤醒其他线程可以获取
23. }
24. return true;
25. } finally {
26. lock.unlock();//释放锁
27. }
28. }
29. //取出
30. public E take() throws InterruptedException {
31. final ReentrantLock lock = this.lock;
32. lock.lockInterruptibly();//加锁
33. try {
34. for (;;) {
35. E first = q.peek();//获取第一个节点
36. if (first == null)
37. available.await();//如果为空的话,阻塞
38. else {
39. long delay = first.getDelay(NANOSECONDS);//获取第一个节点的延迟时间
40. if (delay <= 0)//如果已经可以取出的话,直接取出
41. return q.poll();
42. first = null; // 置空
43. if (leader != null)//如果是leader不为空,说明其他线程在处理
44. available.await();//阻塞当前线程
45. else {
46. Thread thisThread = Thread.currentThread();
47. leader = thisThread;//赋值leader线程
48. try {
49. available.awaitNanos(delay);//等待延迟的时间
50. } finally {
51. if (leader == thisThread)//等待完成,将leader置为null
52. leader = null;
53. }
54. }
55. }
56. }
57. } finally {
58. if (leader == null && q.peek() != null)
59. available.signal();//唤醒其他可以执行的
60. lock.unlock();//释放锁
61. }
62. }
63.
64. }
上面介绍完了延迟队列的核心方法,可以总结为:延迟队列先满足优先级,然后才是等待时间。如果取出来的数据需要等待的话,就等待相应的时间,等待完成之后就置为空,通过一个leade线程来标识。到此,本文介绍的延迟队列就完了,其他方法可自行解读。
以上是关于jdk源码解析--DelayQueue类的主要内容,如果未能解决你的问题,请参考以下文章