javaHashedWheelTimer 使用及源码分析

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javaHashedWheelTimer 使用及源码分析相关的知识,希望对你有一定的参考价值。

1.概述

转载:HashedWheelTimer 使用及源码分析

本文介绍的 HashedWheelTimer 是来自于 Netty 的工具类,在 netty-common 包中。它用于实现延时任务。另外,下面介绍的内容和 Netty 无关。

如果你看过 Dubbo 的源码,一定会在很多地方看到它。在需要失败重试的场景中,它是一个非常方便好用的工具。

本文将会介绍 HashedWheelTimer 的使用,以及在后半部分分析它的源码实现。

2.接口概览

在介绍它的使用前,先了解一下它的接口定义,以及和它相关的类。

HashedWheelTimer 是接口 io.netty.util.Timer 的实现,从面向接口编程的角度,我们其实不需要关心 HashedWheelTimer,只需要关心接口类 Timer 就可以了。这个 Timer 接口只有两个方法:

public interface Timer 

    // 创建一个定时任务
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    // 停止所有的还没有被执行的定时任务
    Set<Timeout> stop();

Timer 是我们要使用的任务调度器,我们可以从方法上看出,它提交一个任务 TimerTask,返回的是一个 Timeout 实例。所以这三个类之间的关系大概是下面这样的:


TimerTask 非常简单,就一个 run() 方法:

public interface TimerTask 
    void run(Timeout timeout) throws Exception;

当然这里有点意思的是,它把 Timeout 的实例也传进来了,我们平时的代码习惯,都是单向依赖。

这样做也有好处,那就是在任务执行过程中,可以通过 timeout 实例来做点其他的事情。

public interface Timeout 
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();

它持有上层的 Timer 实例,和下层的 TimerTask 实例,然后取消任务的操作也在这里面。

3.HashedWheelTimer 使用

有了第一节介绍的接口信息,其实我们很容易就可以使用它了。我们先来随意写几行:

// 构造一个 Timer 实例
Timer timer = new HashedWheelTimer();

// 提交一个任务,让它在 5s 后执行
Timeout timeout1 = timer.newTimeout(new TimerTask() 
    @Override
    public void run(Timeout timeout) 
        System.out.println("5s 后执行该任务");
    
, 5, TimeUnit.SECONDS);

// 再提交一个任务,让它在 10s 后执行
Timeout timeout2 = timer.newTimeout(new TimerTask() 
    @Override
    public void run(Timeout timeout) 
        System.out.println("10s 后执行该任务");
    
, 10, TimeUnit.SECONDS);

// 取消掉那个 5s 后执行的任务
if (!timeout1.isExpired()) 
    timeout1.cancel();


// 原来那个 5s 后执行的任务,已经取消了。这里我们反悔了,我们要让这个任务在 3s 后执行
// 我们说过 timeout 持有上、下层的实例,所以下面的 timer 也可以写成 timeout1.timer()
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);

有钱人终成眷属 没钱人终生痛苦

通过这几行代码,大家就可以非常熟悉这几个类的使用了,因为它们真的很简单。

我们来看一下 Dubbo 中的一个例子。

下面这个代码修改自 Dubbo 的集群调用策略 FailbackClusterInvoker 中:

它在调用 provider 失败以后,返回空结果给消费端,然后由后台线程执行定时任务重试,多用于消息通知这种场景。

public class Application 

    public static void main(String[] args) 
        Application app = new Application();
        app.invoke();
    

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    private volatile Timer failTimer = null;

    public void invoke() 
        try 
            doInvoke();
         catch (Throwable e) 
            log.error("调用 doInvoke 方法失败,5s 后将进入后台的自动重试,异常信息: ", e);
            addFailed(() -> doInvoke());
        
    

    // 实际的业务实现
    private void doInvoke() 
        // 这里让这个方法故意失败
        throw new RuntimeException("故意抛出异常");
    

    private void addFailed(Runnable task) 
        // 延迟初始化
        if (failTimer == null) 
            synchronized (this) 
                if (failTimer == null) 
                    failTimer = new HashedWheelTimer();
                
            
        
        RetryTimerTask retryTimerTask = new RetryTimerTask(task, 3, 5);
        try 
            // 5s 后执行第一次重试
            failTimer.newTimeout(retryTimerTask, 5, TimeUnit.SECONDS);
         catch (Throwable e) 
            log.error("提交定时任务失败,exception: ", e);
        
    

下面是里面使用到的 RetryTimerTask 类,当然,你也可以选择写成内部类:

public class RetryTimerTask implements TimerTask 

    private static final Logger log = LoggerFactory.getLogger(RetryTimerTask.class);

    // 每隔几秒执行一次
    private final long tick;

    // 最大重试次数
    private final int retries;

    private int retryTimes = 0;

    private Runnable task;

    public RetryTimerTask(Runnable task, long tick, int retries) 
        this.tick = tick;
        this.retries = retries;
        this.task = task;
    

    @Override
    public void run(Timeout timeout) 
        try 
            task.run();
         catch (Throwable e) 
            if ((++retryTimes) >= retries) 
                // 重试次数超过了设置的值
                log.error("失败重试次数超过阈值: ,不再重试", retries);
             else 
                log.error("重试失败,继续重试");
                rePut(timeout);
            
        
    

    // 通过 timeout 拿到 timer 实例,重新提交一个定时任务
    private void rePut(Timeout timeout) 
        if (timeout == null) 
            return;
        
        Timer timer = timeout.timer();
        if (timeout.isCancelled()) 
            return;
        
        timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
    


上面的代码也非常简单,在调用 doInvoke() 方法失败以后,提交一个定时任务在 5s 后执行重试,如果还是失败,之后每 3s 重试一次,最多重试 5 次,如果重试 5 次都失败,记录错误日志,不再重试。

打印的日志如下:

15:47:36.232 [main] ERROR c.j.n.timer.Application - 调用 doInvoke 方法失败,5s 后将进入后台的自动重试,异常信息: 
java.lang.RuntimeException: 故意抛出异常
    at com.javadoop.nettylearning.timer.Application.doInvoke(Application.java:36)
    at com.javadoop.nettylearning.timer.Application.invoke(Application.java:28)
    at com.javadoop.nettylearning.timer.Application.main(Application.java:19)
15:47:41.793 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重试失败,继续重试
15:47:44.887 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重试失败,继续重试
15:47:47.986 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重试失败,继续重试
15:47:51.084 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 重试失败,继续重试
15:47:54.186 [pool-1-thread-1] ERROR c.j.n.timer.RetryTimerTask - 失败重试次数超过阈值: 5,不再重试

HashedWheelTimer 的使用确实非常简单,如果你是来学习怎么使用它的,那么看到这里就可以了。

4.HashedWheelTimer 源码分析

大家肯定都知道或听说过,它用的是一个叫做时间轮(下载算法介绍PPT)的算法,看下面我画的图:


我这里先说说大致的执行流程,之后再进行细致的源码分析。

默认地,时钟每 100ms 滴答一下(tick),往前走一格,共 512 格,走完一圈以后继续下一圈。把它想象成生活中的钟表就可以了。

内部使用一个长度为 512 的数组存储,数组元素(bucket)的数据结构是链表,链表每个元素代表一个任务,也就是我们前面介绍的 Timeout 的实例。

提交任务的线程,只要把任务往虚线上面的任务队列中存放即可返回。工作线程是单线程,一旦开启,不停地在时钟上绕圈圈。

仔细看下面的介绍:

  1. 工作线程到达每个时间整点的时候,开始工作。在 HashedWheelTimer 中,时间都是相对时间,工作线程的启动时间,定义为时间的 0 值。因为一次 tick 是 100ms(默认值),所以 100ms、200ms、300ms… 就是我说的这些整点。

  2. 如上图,当时间到 200ms 的时候,发现任务队列有任务,取出所有的任务。

  3. 按照任务指定的执行时间,将其分配到相应的 bucket 中。如上图中,任务2 和任务6指定的时间为 100ms~200ms 这个区间,就被分配到第二个 bucket 中,形成链表,其他任务同理。

这里还有轮次的概念,不过不用着急,比如任务 6 指定的时间可能是 150ms + (512*100ms),它也会落在这个 bucket 中,但是它是下一个轮次才能被执行的。

任务分配到 bucket 完成后,执行该次 tick 的真正的任务,也就是落在第二个 bucket 中的任务 2 和任务 6。

假设执行这两个任务共消耗了 50ms,到达 250ms 的时间点,那么工作线程会休眠 50ms,等待进入到 300ms 这个整点。

如果这两个任务执行的时间超过 100ms 怎么办?这个问题就要看源码来解答了。

开始源码分析。我们从它的默认构造器开始,一步步到达最后一个最复杂的构造器:

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts) 
  ......

简单说一下各个参数:

  1. threadFactory:定时任务都是后台任务,需要开启线程,我们通常会通过自定义 threadFactory 来命名线程,嫌麻烦就使用 Executors.defaultThreadFactory()

  2. tickDuration 和 timeUnit 定义了一格的时间长度,默认的就是 100ms。

  3. ticksPerWheel 定义了一圈有多少格,默认的就是 512;

  4. leakDetection:用于追踪内存泄漏,本文不会介绍它,感兴趣的读者请自行去了解它。

  5. maxPendingTimeouts:最大允许等待的 Timeout 实例数,也就是我们可以设置不允许太多的任务等待。如果未执行任务数达到阈值,那么再次提交任务会抛出 RejectedExecutionException 异常。默认不限制。

5.初始化 HashedWheelTimer

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts) 

    // ...... 参数检查

    // 初始化时间轮,这里做了向上"取整",保持数组长度为 2 的 n 次方
    wheel = createWheel(ticksPerWheel);
    // 掩码,用来做取模
    mask = wheel.length - 1;

    // 100ms 转换为纳秒 100*10^6
    this.tickDuration = unit.toNanos(tickDuration);

    // 防止溢出
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) 
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    
    // 创建工作线程,这里没有启动线程。后面会看到,在第一次提交任务的时候会启动线程
    workerThread = threadFactory.newThread(worker);

    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

    // 赋值最大允许等待任务数
    this.maxPendingTimeouts = maxPendingTimeouts;

    // 如果超过 64 个 HashedWheelTimer 实例,它会打印错误日志提醒你
    // Netty 是真的到位,就怕你会用错这个工具,到处实例化它。而且它只会报错一次。
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) 
        reportTooManyInstances();
    

上面,HashedWheelTimer 完成了初始化,初始化了时间轮数组 HashedWheelBucket[],稍微看一下内部类 HashedWheelBucket,可以看到它是一个链表的结构。这个很好理解,因为每一格可能有多个任务。

6.提交第一个任务

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) 
    if (task == null) 
        throw new NullPointerException("task");
    
    if (unit == null) 
        throw new NullPointerException("unit");
    

    // 校验等待任务数是否达到阈值 maxPendingTimeouts
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) 
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    

    // 如果工作线程没有启动,这里负责启动
    start();

    /** 下面的代码,构建 Timeout 实例,将其放到任务队列中。 **/

    // deadline 是一个相对时间,相对于 HashedWheelTimer 的启动时间
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    if (delay > 0 && deadline < 0) 
        deadline = Long.MAX_VALUE;
    
    // timeout 实例,一个上层依赖 timer,一个下层依赖 task,另一个是任务到期时间
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 放到 timeouts 队列中
    timeouts.add(timeout);
    return timeout;


提交任务的操作非常简单,实例化 Timeout,然后放到任务队列中。

我们可以看到,这里使用的优先级队列是一个 MPSC(Multiple Producer Single Consumer)的队列,刚好适用于这里的多生产线程,单消费线程的场景。而在 Dubbo 中,使用的队列是 LinkedBlockingQueue,它是一个以链表方式组织的线程安全的队列。

另外就是注意这里调用的 start() 方法,如果该任务是第一个提交的任务,它会负责工作线程的启动。

7.工作线程开始工作

其实只要看懂下面的几行代码,HashedWheelTimer 的源码就非常简单了。

private final class Worker implements Runnable 
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    // tick 过的次数,前面说过,时针每 100ms tick 一次
    private long tick;

    @Override
    public void run() 
        // 在 HashedWheelTimer 中,用的都是相对时间,所以需要启动时间作为基准,并且要用 volatile 修饰
        startTime = System.nanoTime();
        if (startTime == 0) 
            // 这里不是很看得懂...请知道的读者不吝赐教
            startTime = 1;
        

        // 第一个提交任务的线程正 await 呢,唤醒它
        startTimeInitialized.countDown();

        // 接下来这个 do-while 是真正执行任务的地方,非常重要
        do 
            // 往下滑,就在当前的代码块里面,倒数第二个方法
            // 比如之前介绍的图,那返回值 deadline 就是 200ms
            final long deadline = waitForNextTick();

            if (deadline > 0) 
                // 该次 tick,bucket 数组对应的 index
                int idx = (int) (tick & mask);

                // 处理一下已经取消的任务,可忽略。
                processCancelledTasks();

                // bucket
                HashedWheelBucket bucket = wheel[idx];

                // 将队列中所有的任务转移到相应的 buckets 中。细节往下看,这个代码块的下一个方法。
                transferTimeoutsToBuckets();

                // 执行进入到这个 bucket 中的任务
                bucket.expireTimeouts(deadline);

                tick++;
            
         while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        /* 到这里,说明这个 timer 要关闭了,做一些清理工作 */

        // 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,
        // 主要目的是用于 stop() 方法返回
        for (HashedWheelBucket bucket: wheel) 
            bucket.clearTimeouts(unprocessedTimeouts);
        
        // 将任务队列中的任务也添加到 unprocessedTimeouts 中
        for (;;) 
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) 
                break;
            
            if (!timeout.isCancelled()) 
                unprocessedTimeouts.add(timeout);
            
        
        processCancelledTasks();
    

    private void transferTimeoutsToBuckets() 
        // 这里一个 for 循环,还特地限制了 10 万次,就怕你写错代码,一直往里面扔任务,可能实际也没什么用吧
        for (int i = 0; i < 100000; i++) 
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) 
                // 没有任务了
                break;
            
            if (timeout.state<

以上是关于javaHashedWheelTimer 使用及源码分析的主要内容,如果未能解决你的问题,请参考以下文章

gitlab的fork及源项目的同步

Linux(Ubuntu系统)安装yum及源的更新(详细操作+文字描述!!!)

网络编程

Socket

UDP/TCP

UDP通讯协议