Netty HashedWheelTimer 介绍
Posted 绝世好阿狸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty HashedWheelTimer 介绍相关的知识,希望对你有一定的参考价值。
HashedWheelTimer是netty开发包里时间轮组件,可以用于提交延迟任务。Java里的Time组件也具备相同的功能,不过Time是基于优先队列实现的,相当于需要对所有的任务基于执行时间排个序,复杂度是logn。而HashedWheelTimer是另一种思想,预先放置一定数量的任务槽,任务提交时,根据延迟时间放入对应的槽位。工作线程利用sleep模拟tick,将到达的槽位里的任务取出,依次执行。极端情况下是On,如果槽位足够大,接近于O1。
使用
public static void main(String args[])
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 100);
timer.start();
TimerTask task = new TimerTask()
@Override
public void run(Timeout timeout) throws Exception
System.out.println("任务执行");
;
timer.newTimeout(task, 3, TimeUnit.SECONDS);
timer.newTimeout(task, 4, TimeUnit.SECONDS);
原理
1.接口
HashedWheelTimer类实现了Timer接口,Timer接口是netty包下的接口,和Java自带的Timer类不同。先看一下这个接口:
public interface Timer
/**
* Schedules the specified @link TimerTask for one-time execution after
* the specified delay.
*
* @return a handle which is associated with the specified task
*
* @throws IllegalStateException if this timer has been
* @linkplain #stop() stopped already
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* Releases all resources acquired by this @link Timer and cancels all
* tasks which were scheduled but not executed yet.
*
* @return the handles associated with the tasks which were canceled by
* this method
*/
Set<Timeout> stop();
可以看到Timer接口的作用是:提供了一种延迟执行任务的能力。里面包含两个核心方法:newTimeout以及stop方法。newTimeout方法允许我们提交一个延迟任务。stop方法停止当前时间轮,并立即执行未执行完的任务。这里我们重点看下newTimeout方法,参数一是TimerTask类型,是netty对于延迟任务的封装接口:
public interface TimerTask
/**
* Executed after the delay specified with
* @link Timer#newTimeout(TimerTask, long, TimeUnit).
*
* @param timeout a handle which is associated with this task
*/
void run(Timeout timeout) throws Exception;
其核心代码就是run方法。run方法有一个Timeout类型的参数,这是netty包下的另一个接口:
public interface Timeout
/**
* Returns the @link Timer that created this handle.
*/
Timer timer();
/**
* Returns the @link TimerTask which is associated with this handle.
*/
TimerTask task();
/**
* Returns @code true if and only if the @link TimerTask associated
* with this handle has been expired.
*/
boolean isExpired();
/**
* Returns @code true if and only if the @link TimerTask associated
* with this handle has been cancelled.
*/
boolean isCancelled();
/**
* Attempts to cancel the @link TimerTask associated with this handle.
* If the task has been executed or cancelled already, it will return with
* no side effect.
*
* @return True if the cancellation completed successfully, otherwise false
*/
boolean cancel();
这个接口是干啥的?代表一个TimerTask的执行状态,类似Java里的FutureTask,不仅仅包含了任务信息,也包含任务执行的状态,比如通过Timeout,我们可以拿到任务本身,还可以拿到任务执行结果以及执行该任务的Timer类实例等信息。至此,核心接口职责基本清晰。
2.构造函数
下面先看HashedWheelTimer的构造函数:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel)
if (threadFactory == null)
throw new NullPointerException("threadFactory");
if (unit == null)
throw new NullPointerException("unit");
if (tickDuration <= 0)
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
if (ticksPerWheel <= 0)
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length)
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
workerThread = threadFactory.newThread(worker);
leak = leakDetector.open(this);
看一下关键的参数:
* @param tickDuration 时间轮走一个花费的时间
* @param unit 时间单位
* @param ticksPerWheel 一轮总共多少格
先初始化了wheel对象,wheel对象是一个数组,每一个元素就是时间轮的一个格子。格子里存放的是任务列表,netty用的是set结构。关于wheel变量的初始化在createWheel函数中:
private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel)
if (ticksPerWheel <= 0)
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
if (ticksPerWheel > 1073741824)
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++)
wheel[i] = Collections.newSetFromMap(
PlatformDependent.<HashedWheelTimeout, Boolean>newConcurrentHashMap());
return wheel;
根据格子数初始化时间轮数据。最终格子数是2的n次方,这样是为了将取摸运算转换为位运算。
构造完wheel变量以后,接着初始化tickDuration变量和线程变量,将worker工作线程装入主线程对象中。
3.启动
public void start()
switch (workerState.get())
case WORKER_STATE_INIT:
if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED))
workerThread.start();
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
// Wait until the startTime is initialized by the worker.
while (startTime == 0)
try
startTimeInitialized.await();
catch (InterruptedException ignore)
// Ignore - it will be ready very soon.
start方法使用cas将时间轮的state变成开始状态,然后启动工作线程。
4.工作线程
public void run()
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0)
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();
do
final long deadline = waitForNextTick();
if (deadline > 0)
fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts);
while (workerState.get() == WORKER_STATE_STARTED);
这里是netty 4.0.19版本的实现,和早期的时间轮实现不同。核心方法在do while循环体内。首先调用waitForNextTick方法sleep到下一个格子,然后将格子里的任务拿出来执行。
private long waitForNextTick()
long deadline = tickDuration * (tick + 1);
for (;;)
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0)
if (currentTime == Long.MIN_VALUE)
return -Long.MAX_VALUE;
else
return currentTime;
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows())
sleepTimeMs = sleepTimeMs / 10 * 10;
try
Thread.sleep(sleepTimeMs);
catch (InterruptedException e)
if (workerState.get() == WORKER_STATE_SHUTDOWN)
return Long.MIN_VALUE;
这个方法首先计算出下一个tick的时间,然后减去当前时间,计算出需要sleep的时间,sleep。
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts, long deadline)
// Find the expired timeouts and decrease the round counter
// if necessary. Note that we don't send the notification
// immediately to make sure the listeners are called without
// an exclusive lock.
lock.writeLock().lock();
try
fetchExpiredTimeouts(expiredTimeouts, wheel[(int) (tick & mask)].iterator(), deadline);
finally
// Note that the tick is updated only while the writer lock is held,
// so that newTimeout() and consequently new HashedWheelTimeout() never see an old value
// while the reader lock is held.
tick ++;
lock.writeLock().unlock();
这里根据当前tick值取到对应的格子,因为其他线程可能会提交任务,涉及到并发修改集合,所以这里加了锁。这块的实现和早期的netty实现不同。
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts,
Iterator<HashedWheelTimeout> i, long deadline)
while (i.hasNext())
HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0)
i.remove();
if (timeout.deadline <= deadline)
expiredTimeouts.add(timeout);
else
// The timeout was placed into a wrong slot. This should never happen.
throw new Error(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
else
timeout.remainingRounds --;
定位到具体的格子后,将set里的所有任务遍历一遍,如果圈数和执行时间都到了,就加入待执行list,否则圈数没到,将圈数减一。
private void notifyExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts)
// Notify the expired timeouts.
for (int i = expiredTimeouts.size() - 1; i >= 0; i --)
expiredTimeouts.get(i).expire();
// Clean up the temporary list.
expiredTimeouts.clear();
拿到当前格子里可执行的任务后,逐个调用其expire方法执行任务,最终调用clear从格子内移除。下面再看下HasheWheelTimeout的实现,该类是之前介绍的Timeout接口的实现类:
public void expire()
if (!state.compareAndSet(ST_INIT, ST_EXPIRED))
return;
try
task.run(this);
catch (Throwable t)
if (logger.isWarnEnabled())
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
可以看到,其expire方法就是调用对应的任务的run方法。
5.任务提交
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
start();
if (task == null)
throw new NullPointerException("task");
if (unit == null)
throw new NullPointerException("unit");
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Add the timeout to the wheel.
HashedWheelTimeout timeout;
lock.readLock().lock();
try
timeout = new HashedWheelTimeout(task, deadline);
if (workerState.get() == WORKER_STATE_SHUTDOWN)
throw new IllegalStateException("Cannot enqueue after shutdown");
wheel[timeout.stopIndex].add(timeout);
finally
lock.readLock().unlock();
return timeout;
HashedWheelTimeout(TimerTask task, long deadline)
this.task = task;
this.deadline = deadline;
long calculated = deadline / tickDuration;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
stopIndex = (int) (ticks & mask);
remainingRounds = (calculated - tick) / wheel.length;
可以看到,提交一个任务,会根据delay的时间计算出deadline时间点。然后构建一个HashedWheelTimeout对象,计算出轮数以及对应的格子。加入到格子链表中。这里可能有并发问题,所以放在lock块内。注意unlock需要在finally块里。
可以看到netty下的时间轮实现非常精巧,本身代码量不多,新版本和老版本的实现略有不同。主要体现在老版本会有一个单独的任务队列,提交新任务时,不会直接加入到格子对应的队列中,而是先加入到一个任务列表里,然后再tick时,一次性将10w个任务加入到时间轮格子里,这样会复杂一些,但是不涉及并发问题。新版本实现允许新提交的任务直接加入到格子队列里,但是需要加锁。
以上是关于Netty HashedWheelTimer 介绍的主要内容,如果未能解决你的问题,请参考以下文章
netty系列之:HashedWheelTimer一种定时器的高效实现