jdk源码解析--DelayQueue类

Posted 我的IT技术路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk源码解析--DelayQueue类相关的知识,希望对你有一定的参考价值。

在之前我们介绍了两种阻塞队列,ArrayBlockingQueueLinkedBlockingQueue

,本节我们要介绍一种具有特殊功能的阻塞队列,延迟队列DelayQueue。使用延迟队列需要注意一点:延迟队列是一个优先级队列,只有优先级高的执行完成才能轮到优先级低的完成。我们先通过一个demo看下怎么使用,demo中需要做一个加入后5秒的时间才能执行的任务:

1. import java.util.Calendar;  

2. import java.util.Random;  

3. import java.util.concurrent.DelayQueue;  

4. import java.util.concurrent.Delayed;  

5. import java.util.concurrent.TimeUnit;  

6. import java.util.stream.IntStream;  

7.   

8. public class DelayQueueTest {  

9.     public static void main(String[] args) throws InterruptedException {  

10.         DelayQueue deque = new DelayQueue();//构建这个优先级队列  

11.         Random random = new Random();  

12.         IntStream.range(0,2).forEach(i-> new Thread(new Producer(deque,String.valueOf(random.nextInt(10)) )).start());  

13.         Thread.sleep(100);//保证10个数据已经添加到队列中  

14.         new Thread(new Consumer(deque)).start();//先启动一个消费者  

15.     }  

16.    //创建延迟任务  

17.     private static class DelayTask implements Delayed{  

18.         private static final long DELAY_TIME = 5000L;  

19.         private long addTime;  

20.         private String key;  

21.         public DelayTask(long t,String k){  

22.             this.addTime = t;  

23.             this.key = k;  

24.         }  

25.   

26.         public long getAddTime() {  

27.             return addTime;  

28.         }  

29.   

30.         public void setAddTime(long addTime) {  

31.             this.addTime = addTime;  

32.         }  

33.   

34.         public String getKey() {  

35.             return key;  

36.         }  

37.   

38.         public void setKey(String key) {  

39.             this.key = key;  

40.         }  

41.         //当getDelay<=0 的时候,就证明该任务可以执行  

42.         @Override  

43.         public long getDelay(TimeUnit unit) {  

44.             return unit.convert(addTime+5000-System.currentTimeMillis(),TimeUnit.MILLISECONDS);  

45.         }  

46.         //维持着队列的顺序的比较方法  

47.         @Override  

48.         public int compareTo(Delayed o) {  

49.             DelayTask task =(DelayTask)o;  

50.             return this.key.compareTo(task.key);  

51.         }  

52.     }  

53.     private static class Producer implements Runnable{  

54.         private DelayQueue deque;  

55.         private String value ;  

56.         public Producer(DelayQueue deque,String value) {  

57.             this.deque = deque;  

58.             this.value = value;  

59.         }  

60.         @Override  

61.         public void run() {  

62.             int i =0;  

63.             while (i<5){  

64.   

65.                 if (deque.add(new DelayTask(System.currentTimeMillis()+(i*1000L),String.valueOf(Integer.valueOf(value)*(i+1))))){  

66.                     System.out.println("producer "+Calendar.getInstance().getTime()+" add "+ Integer.valueOf(value)*(i+1)+" succ");  

67.                     i++;  

68.                 }  

69.             }  

70.   

71.         }  

72.     }  

73.     private static class Consumer implements Runnable{  

74.         private DelayQueue deque;  

75.         public Consumer(DelayQueue deque) {  

76.             this.deque = deque;  

77.         }  

78.         @Override  

79.         public void run() {  

80.             while (true){  

81.                 DelayTask delayTask = null;  

82.                 try {  

83.                     delayTask = (DelayTask) deque.take();//带有阻塞的获取  

84.                 } catch (InterruptedException e) {  

85.                     e.printStackTrace();  

86.                 }  

87.                 System.out.println("consumer "+ Calendar.getInstance().getTime() +" :"+delayTask.getKey());  

88.                 if (deque.isEmpty()){  

89.                     break;  

90.                 }  

91.             }  

92.         }  

93.     }  

94. }  

看下执行的结果:

 


基本上隔了7秒的时间,46秒的时候全部放入,第一个开始消费的是53秒,因为上面在添加的时候加了延迟时间,延迟时间并不是看到的5秒中。从上面的也可以看到延迟队列是先按照顺序,然后按照等待时间进行处理的。

下面我们看下这个延迟队列的具体实现细节。

1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>  

2.     implements BlockingQueue<E> {//注意E是继承Delayed  

3.     private final transient ReentrantLock lock = new ReentrantLock();//锁对戏  

4.     private final PriorityQueue<E> q = new PriorityQueue<E>();//底层的优先级队列  

5.     private Thread leader = null;//等待头部节点的线程,只有头部节点可以获取之后,后面的才能执行  

6.     private final Condition available = lock.newCondition();//已经可用的条件变量  

7.     public DelayQueue() {}  

8.     public DelayQueue(Collection<? extends E> c) {  

9.         this.addAll(c);  

10.     }  

11.     //核心方法  

12.     public void put(E e) {  

13.         offer(e);  

14.     }  

15.      public boolean offer(E e) {  

16.         final ReentrantLock lock = this.lock;  

17.         lock.lock();//加锁  

18.         try {  

19.             q.offer(e);//调用优先级队列的添加  

20.             if (q.peek() == e) {//如果获取到第一个元素时e的话  

21.                 leader = null;//置空leader  

22.                 available.signal();//唤醒其他线程可以获取  

23.             }  

24.             return true;  

25.         } finally {  

26.             lock.unlock();//释放锁  

27.         }  

28.     }  

29.     //取出  

30.     public E take() throws InterruptedException {  

31.         final ReentrantLock lock = this.lock;  

32.         lock.lockInterruptibly();//加锁  

33.         try {  

34.             for (;;) {  

35.                 E first = q.peek();//获取第一个节点  

36.                 if (first == null)  

37.                     available.await();//如果为空的话,阻塞  

38.                 else {  

39.                     long delay = first.getDelay(NANOSECONDS);//获取第一个节点的延迟时间  

40.                     if (delay <= 0)//如果已经可以取出的话,直接取出  

41.                         return q.poll();  

42.                     first = null// 置空  

43.                     if (leader != null)//如果是leader不为空,说明其他线程在处理  

44.                         available.await();//阻塞当前线程  

45.                     else {  

46.                         Thread thisThread = Thread.currentThread();  

47.                         leader = thisThread;//赋值leader线程  

48.                         try {  

49.                             available.awaitNanos(delay);//等待延迟的时间  

50.                         } finally {  

51.                             if (leader == thisThread)//等待完成,将leader置为null  

52.                                 leader = null;  

53.                         }  

54.                     }  

55.                 }  

56.             }  

57.         } finally {  

58.             if (leader == null && q.peek() != null)  

59.                 available.signal();//唤醒其他可以执行的  

60.             lock.unlock();//释放锁  

61.         }  

62.     }  

63.       

64. }  

上面介绍完了延迟队列的核心方法,可以总结为:延迟队列先满足优先级,然后才是等待时间。如果取出来的数据需要等待的话,就等待相应的时间,等待完成之后就置为空,通过一个leade线程来标识。到此,本文介绍的延迟队列就完了,其他方法可自行解读。


以上是关于jdk源码解析--DelayQueue类的主要内容,如果未能解决你的问题,请参考以下文章

J.U.C并发框架源码阅读DelayQueue

jdk源码解析--LongAdder类

jdk源码解析--AbstractStringBuilder类

jdk源码解析--String 类

jdk源码解析--BitSet类

java.util.concurrent.DelayQueue 源码学习