Netty HashedWheelTimer 介绍

Posted 绝世好阿狸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty HashedWheelTimer 介绍相关的知识,希望对你有一定的参考价值。

HashedWheelTimer是netty开发包里时间轮组件,可以用于提交延迟任务。Java里的Time组件也具备相同的功能,不过Time是基于优先队列实现的,相当于需要对所有的任务基于执行时间排个序,复杂度是logn。而HashedWheelTimer是另一种思想,预先放置一定数量的任务槽,任务提交时,根据延迟时间放入对应的槽位。工作线程利用sleep模拟tick,将到达的槽位里的任务取出,依次执行。极端情况下是On,如果槽位足够大,接近于O1。

使用

    public static void main(String args[]) 
        HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 100);
        timer.start();
        TimerTask task = new TimerTask() 
            @Override
            public void run(Timeout timeout) throws Exception 
                System.out.println("任务执行");
            
        ;

        timer.newTimeout(task, 3, TimeUnit.SECONDS);
        timer.newTimeout(task, 4, TimeUnit.SECONDS);
    

原理

1.接口

HashedWheelTimer类实现了Timer接口,Timer接口是netty包下的接口,和Java自带的Timer类不同。先看一下这个接口:

public interface Timer 

    /**
     * Schedules the specified @link TimerTask for one-time execution after
     * the specified delay.
     *
     * @return a handle which is associated with the specified task
     *
     * @throws IllegalStateException if this timer has been
     *                               @linkplain #stop() stopped already
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    /**
     * Releases all resources acquired by this @link Timer and cancels all
     * tasks which were scheduled but not executed yet.
     *
     * @return the handles associated with the tasks which were canceled by
     *         this method
     */
    Set<Timeout> stop();

可以看到Timer接口的作用是:提供了一种延迟执行任务的能力。里面包含两个核心方法:newTimeout以及stop方法。newTimeout方法允许我们提交一个延迟任务。stop方法停止当前时间轮,并立即执行未执行完的任务。这里我们重点看下newTimeout方法,参数一是TimerTask类型,是netty对于延迟任务的封装接口:

public interface TimerTask 

    /**
     * Executed after the delay specified with
     * @link Timer#newTimeout(TimerTask, long, TimeUnit).
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;

其核心代码就是run方法。run方法有一个Timeout类型的参数,这是netty包下的另一个接口:

public interface Timeout 

    /**
     * Returns the @link Timer that created this handle.
     */
    Timer timer();

    /**
     * Returns the @link TimerTask which is associated with this handle.
     */
    TimerTask task();

    /**
     * Returns @code true if and only if the @link TimerTask associated
     * with this handle has been expired.
     */
    boolean isExpired();

    /**
     * Returns @code true if and only if the @link TimerTask associated
     * with this handle has been cancelled.
     */
    boolean isCancelled();

    /**
     * Attempts to cancel the @link TimerTask associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();

这个接口是干啥的?代表一个TimerTask的执行状态,类似Java里的FutureTask,不仅仅包含了任务信息,也包含任务执行的状态,比如通过Timeout,我们可以拿到任务本身,还可以拿到任务执行结果以及执行该任务的Timer类实例等信息。至此,核心接口职责基本清晰。

 

2.构造函数

下面先看HashedWheelTimer的构造函数:

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel) 

    if (threadFactory == null) 
        throw new NullPointerException("threadFactory");
    
    if (unit == null) 
        throw new NullPointerException("unit");
    
    if (tickDuration <= 0) 
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    
    if (ticksPerWheel <= 0) 
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) 
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    

    workerThread = threadFactory.newThread(worker);
    leak = leakDetector.open(this);

看一下关键的参数:

* @param tickDuration   时间轮走一个花费的时间
* @param unit           时间单位
* @param ticksPerWheel  一轮总共多少格

先初始化了wheel对象,wheel对象是一个数组,每一个元素就是时间轮的一个格子。格子里存放的是任务列表,netty用的是set结构。关于wheel变量的初始化在createWheel函数中:

private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) 
    if (ticksPerWheel <= 0) 
        throw new IllegalArgumentException(
                "ticksPerWheel must be greater than 0: " + ticksPerWheel);
    
    if (ticksPerWheel > 1073741824) 
        throw new IllegalArgumentException(
                "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
    

    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
    for (int i = 0; i < wheel.length; i ++) 
        wheel[i] = Collections.newSetFromMap(
                PlatformDependent.<HashedWheelTimeout, Boolean>newConcurrentHashMap());
    
    return wheel;

根据格子数初始化时间轮数据。最终格子数是2的n次方,这样是为了将取摸运算转换为位运算。

构造完wheel变量以后,接着初始化tickDuration变量和线程变量,将worker工作线程装入主线程对象中。

 

3.启动

public void start() 
    switch (workerState.get()) 
        case WORKER_STATE_INIT:
            if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) 
                workerThread.start();
            
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    

    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) 
        try 
            startTimeInitialized.await();
         catch (InterruptedException ignore) 
            // Ignore - it will be ready very soon.
        
    

start方法使用cas将时间轮的state变成开始状态,然后启动工作线程。

 

4.工作线程

public void run() 
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) 
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    

    // Notify the other threads waiting for the initialization at start().
    startTimeInitialized.countDown();

    List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();

    do 
        final long deadline = waitForNextTick();
        if (deadline > 0) 
            fetchExpiredTimeouts(expiredTimeouts, deadline);
            notifyExpiredTimeouts(expiredTimeouts);
        
     while (workerState.get() == WORKER_STATE_STARTED);

这里是netty 4.0.19版本的实现,和早期的时间轮实现不同。核心方法在do while循环体内。首先调用waitForNextTick方法sleep到下一个格子,然后将格子里的任务拿出来执行。

private long waitForNextTick() 
    long deadline = tickDuration * (tick + 1);

    for (;;) 
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        if (sleepTimeMs <= 0) 
            if (currentTime == Long.MIN_VALUE) 
                return -Long.MAX_VALUE;
             else 
                return currentTime;
            
        

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (PlatformDependent.isWindows()) 
            sleepTimeMs = sleepTimeMs / 10 * 10;
        

        try 
            Thread.sleep(sleepTimeMs);
         catch (InterruptedException e) 
            if (workerState.get() == WORKER_STATE_SHUTDOWN) 
                return Long.MIN_VALUE;
            
        
    

这个方法首先计算出下一个tick的时间,然后减去当前时间,计算出需要sleep的时间,sleep。

private void fetchExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts, long deadline) 

    // Find the expired timeouts and decrease the round counter
    // if necessary.  Note that we don't send the notification
    // immediately to make sure the listeners are called without
    // an exclusive lock.
    lock.writeLock().lock();
    try 
        fetchExpiredTimeouts(expiredTimeouts, wheel[(int) (tick & mask)].iterator(), deadline);
     finally 
        // Note that the tick is updated only while the writer lock is held,
        // so that newTimeout() and consequently new HashedWheelTimeout() never see an old value
        // while the reader lock is held.
        tick ++;
        lock.writeLock().unlock();
    

这里根据当前tick值取到对应的格子,因为其他线程可能会提交任务,涉及到并发修改集合,所以这里加了锁。这块的实现和早期的netty实现不同。

private void fetchExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts,
        Iterator<HashedWheelTimeout> i, long deadline) 

    while (i.hasNext()) 
        HashedWheelTimeout timeout = i.next();
        if (timeout.remainingRounds <= 0) 
            i.remove();
            if (timeout.deadline <= deadline) 
                expiredTimeouts.add(timeout);
             else 
                // The timeout was placed into a wrong slot. This should never happen.
                throw new Error(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            
         else 
            timeout.remainingRounds --;
        
    

定位到具体的格子后,将set里的所有任务遍历一遍,如果圈数和执行时间都到了,就加入待执行list,否则圈数没到,将圈数减一。

private void notifyExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts) 
    // Notify the expired timeouts.
    for (int i = expiredTimeouts.size() - 1; i >= 0; i --) 
        expiredTimeouts.get(i).expire();
    

    // Clean up the temporary list.
    expiredTimeouts.clear();

拿到当前格子里可执行的任务后,逐个调用其expire方法执行任务,最终调用clear从格子内移除。下面再看下HasheWheelTimeout的实现,该类是之前介绍的Timeout接口的实现类:

public void expire() 
    if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) 
        return;
    

    try 
        task.run(this);
     catch (Throwable t) 
        if (logger.isWarnEnabled()) 
            logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
        
    

可以看到,其expire方法就是调用对应的任务的run方法。

 

5.任务提交

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) 
    start();

    if (task == null) 
        throw new NullPointerException("task");
    
    if (unit == null) 
        throw new NullPointerException("unit");
    

    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Add the timeout to the wheel.
    HashedWheelTimeout timeout;
    lock.readLock().lock();
    try 
        timeout = new HashedWheelTimeout(task, deadline);
        if (workerState.get() == WORKER_STATE_SHUTDOWN) 
            throw new IllegalStateException("Cannot enqueue after shutdown");
        
        wheel[timeout.stopIndex].add(timeout);
     finally 
        lock.readLock().unlock();
    

    return timeout;


HashedWheelTimeout(TimerTask task, long deadline) 
    this.task = task;
    this.deadline = deadline;

    long calculated = deadline / tickDuration;
    final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
    stopIndex = (int) (ticks & mask);
    remainingRounds = (calculated - tick) / wheel.length;

可以看到,提交一个任务,会根据delay的时间计算出deadline时间点。然后构建一个HashedWheelTimeout对象,计算出轮数以及对应的格子。加入到格子链表中。这里可能有并发问题,所以放在lock块内。注意unlock需要在finally块里。

 

可以看到netty下的时间轮实现非常精巧,本身代码量不多,新版本和老版本的实现略有不同。主要体现在老版本会有一个单独的任务队列,提交新任务时,不会直接加入到格子对应的队列中,而是先加入到一个任务列表里,然后再tick时,一次性将10w个任务加入到时间轮格子里,这样会复杂一些,但是不涉及并发问题。新版本实现允许新提交的任务直接加入到格子队列里,但是需要加锁。

以上是关于Netty HashedWheelTimer 介绍的主要内容,如果未能解决你的问题,请参考以下文章

netty定时器HashedWheelTimer(zz)

netty系列之:HashedWheelTimer一种定时器的高效实现

nettynetty HashedWheelTimer 延时队列

javaHashedWheelTimer 使用及源码分析

分布式任务调度架构原理和设计介绍

基于时间轮的定时器HashedWheelTimer