Netty HashedWheelTimer 源码解析

Posted code4m

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty HashedWheelTimer 源码解析相关的知识,希望对你有一定的参考价值。

Netty提供了基于论文 HashedandHierarchicalTimingWheels:data structures to efficiently implement a timer facility的时间轮定时器简单实现,可用于进行超时检测,本文从源码角度对此进行分析。

源码分析

  • Netty Version 4.0.24

原理

HashedWheelTimer由 ticksPerWheel个Bucket组成,将所有的Bucket组成一个圆环,用户每提交一个计时器时,通过计算将该计时器放入对应的Bucket,在 HashedWheelTimer内部有一个滴答计时器,每隔 tickDuration时间滴答一次,每次仅对一个Bucket内的计时器进行超时检查,当下一次滴答到达时,继续对下一个Bucket内的计时器进行超时检查。

初始化

 
   
   
 
  1. /**

  2. * tickDuration决定了定时器精度,值越小,则精度越高,默认是100ms

  3. * ticksPerWheel决定了Bucket的数量,值越小,则放入同一个Bucket中的元素可能越多,默认值是512

  4. */

  5. public HashedWheelTimer(

  6. ThreadFactory threadFactory,

  7. long tickDuration, TimeUnit unit, int ticksPerWheel) {


  8. if (threadFactory == null) {

  9. throw new NullPointerException("threadFactory");

  10. }

  11. if (unit == null) {

  12. throw new NullPointerException("unit");

  13. }

  14. if (tickDuration <= 0) {

  15. throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);

  16. }

  17. if (ticksPerWheel <= 0) {

  18. throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);

  19. }


  20. // Normalize ticksPerWheel to power of two and initialize the wheel.

  21. // Bucket数量固定,提前创建好

  22. wheel = createWheel(ticksPerWheel);

  23. mask = wheel.length - 1;


  24. // Convert tickDuration to nanos.

  25. this.tickDuration = unit.toNanos(tickDuration);


  26. // Prevent overflow.

  27. if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {

  28. throw new IllegalArgumentException(String.format(

  29. "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",

  30. tickDuration, Long.MAX_VALUE / wheel.length));

  31. }

  32. // 内部超时检测线程,单线程

  33. workerThread = threadFactory.newThread(worker);


  34. leak = leakDetector.open(this);

  35. }

创建超时任务

创建超时任务时,并不会立即将该任务添加到对应的Bucket中,而是先放入 timeouts队列里,等待下一次tick到达时,再转移到Bucket中

 
   
   
 
  1. public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

  2. if (task == null) {

  3. throw new NullPointerException("task");

  4. }

  5. if (unit == null) {

  6. throw new NullPointerException("unit");

  7. }

  8. // 通过减计数锁等待Timer启动时间初始化完成

  9. start();


  10. // Add the timeout to the timeout queue which will be processed on the next tick.

  11. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.

  12. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

  13. HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);

  14. timeouts.add(timeout);

  15. return timeout;

  16. }

 
   
   
 
  1. public void start() {

  2. switch (WORKER_STATE_UPDATER.get(this)) {

  3. case WORKER_STATE_INIT:

  4. if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {

  5. workerThread.start();

  6. }

  7. break;

  8. case WORKER_STATE_STARTED:

  9. break;

  10. case WORKER_STATE_SHUTDOWN:

  11. throw new IllegalStateException("cannot be started once stopped");

  12. default:

  13. throw new Error("Invalid WorkerState");

  14. }


  15. // Wait until the startTime is initialized by the worker.

  16. while (startTime == 0) {

  17. try {

  18. startTimeInitialized.await();

  19. } catch (InterruptedException ignore) {

  20. // Ignore - it will be ready very soon.

  21. }

  22. }

  23. }

Worker

Worker是内部类,实现了 Runnable接口,是超时检测线程真正执行的任务,主要入口为 run方法

 
   
   
 
  1. public void run() {

  2. // Initialize the startTime.

  3. startTime = System.nanoTime();

  4. if (startTime == 0) {

  5. // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.

  6. startTime = 1;

  7. }


  8. // Notify the other threads waiting for the initialization at start().

  9. // 通过计数锁来标记Timer初始化时间,在此之前所有创建的超时任务都会被阻塞

  10. startTimeInitialized.countDown();


  11. do {

  12. // 通过Thread.sleep来等待下一次tick到达

  13. final long deadline = waitForNextTick();

  14. if (deadline > 0) {

  15. int idx = (int) (tick & mask);

  16. // 先处理已取消的超时任务

  17. processCancelledTasks();

  18. HashedWheelBucket bucket =

  19. wheel[idx];

  20. // 将新创建的超时任务添加到对应的Bucket中

  21. transferTimeoutsToBuckets();

  22. // 再处理匹配的Bucket中的超时任务

  23. bucket.expireTimeouts(deadline);

  24. // 滴答计数器自增1

  25. tick++;

  26. }

  27. } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);


  28. // Fill the unprocessedTimeouts so we can return them from stop() method.

  29. for (HashedWheelBucket bucket: wheel) {

  30. bucket.clearTimeouts(unprocessedTimeouts);

  31. }

  32. for (;;) {

  33. HashedWheelTimeout timeout = timeouts.poll();

  34. if (timeout == null) {

  35. break;

  36. }

  37. if (!timeout.isCancelled()) {

  38. unprocessedTimeouts.add(timeout);

  39. }

  40. }

  41. processCancelledTasks();

  42. }

 
   
   
 
  1. private void processCancelledTasks() {

  2. for (;;) {

  3. // 遍历取消队列里的所有元素,并执行对应的run方法(实际是将该元素从Bucket中删除)

  4. Runnable task = cancelledTimeouts.poll();

  5. if (task == null) {

  6. // all processed

  7. break;

  8. }

  9. try {

  10. task.run();

  11. } catch (Throwable t) {

  12. if (logger.isWarnEnabled()) {

  13. logger.warn("An exception was thrown while process a cancellation task", t);

  14. }

  15. }

  16. }

  17. }

定期将队列里新创建的超时任务转移到Bucket中

 
   
   
 
  1. private void transferTimeoutsToBuckets() {

  2. // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just

  3. // adds new timeouts in a loop.

  4. // 每次最多转移100,000个超时检测任务到Bucket,其他的要等下次执行该方法时(正常情况下是下一次tick到达)处理

  5. for (int i = 0; i < 100000; i++) {

  6. HashedWheelTimeout timeout = timeouts.poll();

  7. if (timeout == null) {

  8. // all processed

  9. break;

  10. }

  11. // 如果在添加到Bucket之前任务已取消,则不处理

  12. if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {

  13. // Was cancelled in the meantime.

  14. continue;

  15. }


  16. long calculated = timeout.deadline / tickDuration;

  17. timeout.remainingRounds = (calculated - tick) / wheel.length;


  18. final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.

  19. // 计算所归属的Bucket

  20. int stopIndex = (int) (ticks & mask);


  21. HashedWheelBucket bucket = wheel[stopIndex];

  22. // 添加到Bucket内部的链表

  23. bucket.addTimeout(timeout);

  24. }

  25. }

HashedWheelTimeout

HashedWheelTimeout是内部类,里面包含了用户提交的 TimerTask,主要包括定时器超时和取消操作

超时

 
   
   
 
  1. public void expire() {

  2. if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {

  3. return;

  4. }


  5. try {

  6. // 执行用户提交的超时回调,捕获所有异常,保证线程不会因用户异常而终止

  7. task.run(this);

  8. } catch (Throwable t) {

  9. if (logger.isWarnEnabled()) {

  10. logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);

  11. }

  12. }

  13. }

取消

取消操作通常由用户发起,用户取消线程和Timer内部的超时检测线程是两个不同的线程,如果用户在取消的同时直接将 HashedWheelTimeout从Bucket中移除,则需要加锁来避免线程安全问题,Netty并没有这么处理,而是通过CAS操作来修改 HashedWheelTimeout状态,并将取消的任务添加到队列里,在下一次tick到达时,再从Bucket中移除,从而避免了加锁

 
   
   
 
  1. public boolean cancel() {

  2. // only update the state it will be removed from HashedWheelBucket on next tick.

  3. if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {

  4. return false;

  5. }

  6. // If a task should be canceled we create a new Runnable for this to another queue which will

  7. // be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration

  8. // which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the

  9. // locking / overhead as much as possible.

  10. //

  11. // It is important that we not just add the HashedWheelTimeout itself again as it extends

  12. // MpscLinkedQueueNode and so may still be used as tombstone.

  13. timer.cancelledTimeouts.add(new Runnable() {

  14. @Override

  15. public void run() {

  16. HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;

  17. if (bucket != null) {

  18. bucket.remove(HashedWheelTimeout.this);

  19. }

  20. }

  21. });

  22. return true;

  23. }

HashedWheelBucket

HashedWheelBucket是内部类,其中存储着归属到该Bucket的所有 HashedWheelTimeout
Bucket内的操作主要是增/删 HashedWheelTimeout,没有查询 HashedWheelTimeout需求,因此底层采用链表结构进行存储,每个Bucket内部包含 head和 tail两个成员变量

 
   
   
 
  1. private HashedWheelTimeout head;

  2. private HashedWheelTimeout tail;

添加元素

简单的链表添加操作,将新增加的 HashedWheelTimeout添加到链表末尾

 
   
   
 
  1. public void addTimeout(HashedWheelTimeout timeout) {

  2. assert timeout.bucket == null;

  3. timeout.bucket = this;

  4. if (head == null) {

  5. head = tail = timeout;

  6. } else {

  7. tail.next = timeout;

  8. timeout.prev = tail;

  9. tail = timeout;

  10. }

  11. }

删除元素

 
   
   
 
  1. public void remove(HashedWheelTimeout timeout) {

  2. HashedWheelTimeout next = timeout.next;

  3. // remove timeout that was either processed or cancelled by updating the linked-list

  4. if (timeout.prev != null) {

  5. timeout.prev.next = next;

  6. }

  7. if (timeout.next != null) {

  8. timeout.next.prev = timeout.prev;

  9. }


  10. if (timeout == head) {

  11. // if timeout is also the tail we need to adjust the entry too

  12. if (timeout == tail) {

  13. tail = null;

  14. head = null;

  15. } else {

  16. head = next;

  17. }

  18. } else if (timeout == tail) {

  19. // if the timeout is the tail modify the tail to be the prev node.

  20. tail = timeout.prev;

  21. }

  22. // null out prev, next and bucket to allow for GC.

  23. timeout.prev = null;

  24. timeout.next = null;

  25. timeout.bucket = null;

  26. }

超时检测

遍历链表中所有元素,逐个检查是否已超时。如果已超时或已取消,则从链表中删除对应元素

 
   
   
 
  1. public void expireTimeouts(long deadline) {

  2. HashedWheelTimeout timeout = head;


  3. // process all timeouts

  4. // 遍历所有元素

  5. while (timeout != null) {

  6. boolean remove = false;

  7. // 通过remainingRounds来判断是否超时

  8. if (timeout.remainingRounds <= 0) {

  9. if (timeout.deadline <= deadline) {

  10. timeout.expire();

  11. } else {

  12. // The timeout was placed into a wrong slot. This should never happen.

  13. throw new IllegalStateException(String.format(

  14. "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));

  15. }

  16. remove = true;

  17. } else if (timeout.isCancelled()) {

  18. remove = true;

  19. } else {

  20. timeout.remainingRounds --;

  21. }

  22. // store reference to next as we may null out timeout.next in the remove block.

  23. HashedWheelTimeout next = timeout.next;

  24. if (remove) {

  25. remove(timeout);

  26. }

  27. timeout = next;

  28. }

  29. }

Q&A

Q:超时任务 TimerTask回调方法耗时较长会有什么影响?
A:从源码中可以看出,内部只有一个线程在顺序检查Bucket内的超时情况,并依次调用超时回调,如果回调方法耗时较长,可能会影响后续的超时检测精度,因此建议回调方法中不要有耗时操作

思考

  1. 从源码中可以看到, HashedWheelBucket#expireTimeouts每次超时检测操作都会遍历Bucket内的所有元素,难免效率较低,是否可以考虑将链表中的元素按照 remainingRounds有序排列, remainingRounds不采用与0比较的方式,而是直接跟已经走过的轮数来比较,这样如果链表中某个元素被判断出不超时,则之后的所有元素都无须继续检查


以上是关于Netty HashedWheelTimer 源码解析的主要内容,如果未能解决你的问题,请参考以下文章

技术干货 | 深挖 Netty 源码:时间轮底层原理分析

基于时间轮的定时器HashedWheelTimer

javaHashedWheelTimer 使用及源码分析

netty定时器HashedWheelTimer(zz)

Netty HashedWheelTimer 介绍

netty系列之:HashedWheelTimer一种定时器的高效实现