基于时间轮的定时器HashedWheelTimer

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于时间轮的定时器HashedWheelTimer相关的知识,希望对你有一定的参考价值。

基于时间轮的定时器HashedWheelTimer


一、前言

最近在阅读Redisson的源码时看到了内部使用了netty提供的这个组件,就想着看下这个定时器具体是如何实现的?

先介绍一下HashedWheelTimer,它是基于时间轮实现的一个定时器,它的优点是实现相对简单,缺点是无法精确、准时地执行定时任务,只能是近似执行

因为时间轮中每个刻度大小可能是100ms也可能1ms,所以在执行任务时,时间上会存在一点误差,在大部分网络应用中,IO任务的执行时间往往不需要那么精确,因此默认每个刻度小大是100ms,但你可以自己来调整刻度大小,最小是1ms。

简单介绍完HahsedWheelTimer,接下来我们先来看下时间轮的结构

二、时间轮的结构

时间轮类似于一个时钟,它和时钟一样是有刻度的,每个刻度大小可以是100ms也可以是1ms,如下图

上图的时间轮有6个刻度,每个刻度大小是100ms,也就是每过100ms会顺时针移动一个刻度,走完一圈需要600ms(下面要介绍的HashedWheelTimer默认刻度数是512,每个刻度大小默认是100ms)

工作原理:每往时间轮提交一个延时任务,会判断该任务的执行时间要放在哪个刻度上,比如在时间轮启动后的第100ms,提交了一个延时400ms执行的任务,那么该任务应该放在刻度5上,如果提交了一个延迟700ms执行的任务,那么该任务会放在刻度2上,并且会记录该任务还需要走一圈时间轮才能执行。时间轮每移动一个刻度,就会执行当前刻度上的任务,一个刻度上的任务可能会有多个

因为HashedWheelTimer是基于时间轮的定时器,所以接下来看一下HashedWheelTimer是如何实现的?

三、HashedWheelTimer的相关组件

这里我们可以先看下HashedWheelTimer的UML图,能够对相关组件先有个整体的认识,如下

  • Timer: 定时器接口,提供提交延时任务newTimeout、停止定时器等方法

  • HashedWheelTimer: 实现Timer接口,内部包含工作线程Worker、时间轮wheel、延时任务队列timeouts、线程池taskExecutor等

  • HashedWheelBucket:上面的时间轮wheel是一个HashedWheelBucket数组,每一个刻度对应一个HashedWheelBucket,而每一个HashedWheelBucket内部是一个HashedWheelTimeout的双向链表,如下图

  • TimerTask: 延时任务接口,内部只提供一个run方法用于执行

  • Timeout: 对Timer、TimerTask的封装

  • HashedWheelTimeout: 包含了任务的执行时间dealline、所需要的圈数remainingRounds、双向链表中上一个以及下一个HashedWheelTimeout、所在的HashedWheelBucket等

四、HashedWheelTimer的工作流程

大致工作流程如下图:

从上图可以看到,主要分为4步骤,但是准确来说应该是有5步:

  1. 提交延时任务给HashedWheelTimer,延时任务会先放到任务队列timeouts中
  2. 工作线程Worker会从任务队列timeouts中获取任务
  3. 将获取到的HashedWheelTimeout任务放到指定的HashedWheelBucket中
  4. 取出当前刻度对应的HashedWheelBucket的所有HashedWheelTimeout来执行
  5. 将刻度tick加1,再回到第二步,如此循环

五、源码解读

5.1 HahedWheelTimer的关键属性

关键属性如下:

  • Worker worker:工作线程Worker
  • int workerState:工作线程状态
  • long tickDuration:刻度大小,默认是100ms
  • HashedWheelBucket[] wheel时间轮的每个刻度会对应一个HashedWheelBucket
  • Queue timeouts任务队列
  • Queue cancelledTimeouts:已取消的任务队列
  • AtomicLong pendingTimeouts:正在处理的任务数
  • Executor taskExecutor:线程池,用于执行任务
  • long startTime定时器的启动时间

5.2 提交延时任务给HahedWheelTimer

通过newTimeout方法来提交延时任务,newTimeout方法步骤如下:

  1. 启动工作线程Worker,如果是首次启动,设置启动时间startTime,如果已启动,则跳过
  2. 计算延时任务的deadline(当前时间 + 延迟时间 - 启动时间startTime),用于判断后续放到时间轮的哪个HashedWheelBucket中
  3. 将延时任务封装为HashedWheelTimeout,并添加到任务队列timeouts

结合源码来看:

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) 
    // 部分代码省略
    
    // 启动工作线程Worker
    start();

    // 计算deadline
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if (delay > 0 && deadline < 0) 
        deadline = Long.MAX_VALUE;
    
    
    // 封装为HashedWheelTimeout,并添加到任务队列timeouts中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;

5.3 工作线程Worker运行的具体步骤

Worker类中有一个关键的属性tick,代表相对于定时器的启动时间startTime,当前已经走到了哪个刻度,tick只会一直往上递增,初始值为0

具体步骤如下:

  1. 等到下一个刻度来临,即当前时间 > 当前刻度tick的结束时间
    a) 计算当前刻度tick的结束时间,比如Worker刚启动,当前刻度tick为0,那么刻度tick的结束时间 = tickDuration * (tick + 1),即100ms
    b) 判断当前时间(相对于启动时间startTime)是否大于当前刻度的结束时间,如果大于,说明当前时间已经过了当前刻度的结束时间,开始准备处理当前刻度的所有任务。如果小于,说明当前时间还没到当前刻度的结束时间,主动sleep一段时间后继续判断,直到当前时间大于当前刻度的结束时间。
  2. 从任务队列timeouts中获取任务,将延时任务的deadline除以tickDuration,计算出该任务的总刻度数以及还需要的圈数,通过 **总刻度数 & (wheel.length -1 )**来算出放在哪个HashedWheelBucket中(比如算出A任务的总刻度数 = 1026,当前刻度 = 25,时间轮的刻度有512个,那么算出还需要的圈数是1【如果当前刻度 = 1,那么还需要的圈数会是2】,放在下标为2的HashedWheelBucket中
  3. 获取当前刻度对应的HashedWheelBucket,从head开始逐一遍历任务链表,如果延时任务的所需圈数为0,开始执行任务,否则所需圈数减1。
  4. 刻度tick加1,回到第一步,如此循环

结合源码来看

public void run() 
    // 初始化定时器的启动时间startTime
    startTime = System.nanoTime();
    startTimeInitialized.countDown();

    do 
        // 1、等到下一个刻度来临,即当前时间 > 当前刻度tick的结束时间
        final long deadline = waitForNextTick();
        if (deadline > 0) 
            // 获取当前刻度tick对应的HashedWheelBucket
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            
            // 2、从任务队列timeouts中获取任务,并将任务放入到对应的HashedWheelBucket中        
            transferTimeoutsToBuckets();
            // 3、执行当前刻度tick对应的HashedWheelBucket中的所有任务
            bucket.expireTimeouts(deadline);
            // 4、将当前刻度tick加1
            tick++;
        
     while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // 部分代码省略

这里我们先来看下第一步waitForNextTick方法的具体实现

private long waitForNextTick() 
    long deadline = tickDuration * (tick + 1);

    for (;;) 
        // 相对于startTime的当前时间
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
   
        // 如果当前刻度tick的结束时间 < 当前时间,说明当前时间已经过了当前刻度的结束时间,直接返回当前时间
        if (sleepTimeMs <= 0) 
            if (currentTime == Long.MIN_VALUE) 
                return -Long.MAX_VALUE;
             else 
                return currentTime;
            
        
        
        // 否则主动sleep一段时间,上面的条件成立
        try 
            Thread.sleep(sleepTimeMs);
         catch (InterruptedException ignored) 
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) 
                return Long.MIN_VALUE;
            
        
    

接着看下第二步的transferTimeoutsToBuckets方法,如下

private void transferTimeoutsToBuckets() 
    // 这里一次最多从队列里获取100000个任务
    for (int i = 0; i < 100000; i++) 
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) 
            // 代表队列里已经没有任务,直接返回
            break;
        
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) 
            continue;
        
        
        // 计算总刻度数 = 延时任务的deadline / 刻度大小
        long calculated = timeout.deadline / tickDuration;
        // 计算还需要的圈数 = 总刻度数 - 当前刻度 / 时间轮的刻度数
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        
        final long ticks = Math.max(calculated, tick);
        // 计算放在哪个下标
        int stopIndex = (int) (ticks & mask);
        HashedWheelBucket bucket = wheel[stopIndex];
        // 将该任务放入到对应的HashedWheelBucket中
        bucket.addTimeout(timeout);
    

最后看下第三步bucket.expireTimeouts,源码如下:

public void expireTimeouts(long deadline) 
    HashedWheelTimeout timeout = head;

    // 处理该HashedWheelBucket的所有任务
    while (timeout != null) 
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) 
            // 将任务从双向链表中移除
            next = remove(timeout);
            if (timeout.deadline <= deadline) 
                // 执行任务
                timeout.expire();
             else 
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            
         else if (timeout.isCancelled()) 
            next = remove(timeout);
         else 
            // 如果所需圈数 > 0,则将其减1
            timeout.remainingRounds --;
        
        timeout = next;
    

至此,工作线程Worker运行的具体步骤以及部分源码的解读就完成了

六、总结

HashedWheelTimer只是定时器的一种简单实现,像java中常见的定时器还有Timer、ScheduledThreadPoolExecutor等,从上面分析它的实现原理可知,它无法应用于需要精确执行的场景,但是在网络应用中,IO任务的执行时间往往不需要精确,所以它可以在任务较多、但任务不需要精确执行的场景下进行使用。


参考

【Netty】八、基于时间轮的定时器HashedWheelTimer

以上是关于基于时间轮的定时器HashedWheelTimer的主要内容,如果未能解决你的问题,请参考以下文章

Netty HashedWheelTimer 源码解析

基于时间轮的定时器

netty系列之:HashedWheelTimer一种定时器的高效实现

netty定时器HashedWheelTimer(zz)

Netty HashedWheelTimer 介绍

「linux」定时器方案:红黑树最小堆和时间轮的原理