时间轮在NettyKafka中的应用
Posted 总要冲动一次
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了时间轮在NettyKafka中的应用相关的知识,希望对你有一定的参考价值。
时间轮
概述
时间轮是一个高性能、低消耗的数据结构,它适合用非准实时,延迟的短平快任务,例如心跳检测。在Netty、Kafka、Zookeeper中都有使用。
时间轮可通过时间与任务存储分离的形式,轻松实现百亿级海量任务调度。
Netty中的时间轮
作用
Netty动辄管理100w+的连接,每一个连接都会有很多超时任务。比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量的资源。
抽象
时间轮 | 时间轮的格子 | 格子里的任务 | 时间轮运转线程 |
---|---|---|---|
HashedWheelTimer | HashedWheelBucket | HashedWheelTimeout | Worker |
其他一些属性:
时间轮零点时间:startTime
当前指针所指格子:tick
格子长度(持续时间):tickDuration
时间轮运转轮次、回合:remainingRounds
任务截止时间、触发时间(相对时间轮的startTime):deadline
概括时间轮工作流程
(阅读Netty3.10.6)
1、时间轮的启动并不是在构造函数中,而是在第一次提交任务的时候newTimeout()
2、启动时间轮第一件事就是初始化时间轮的零点时间startTime,以后时间轮上的任务、格子触发时间计算都相对这个时间
3、随着时间的推移第一个格子(tick)触发,在触发每个格子之前都是处于阻塞状态,并不是直接去处理这个格子的所有任务,而是先从任务队列timeouts中拉取最多100000个任务,根据每个任务的触发时间deadline放在不同的格子里(注意,Netty中会对时间轮上的每一个格子进行处理,即使这个格子没有任务)
4、时间轮运转过程中维护着一个指针tick,根据当前指针获取对应的格子里的所有任务进行处理
5、任务自身维护了一个剩余回合(remainingRounds),代表任务在哪一轮执行处理,只有该值为0时才进行处理
源码
代码做了删减,只体现重点
时间轮构造器:
初始化了时间轮大小、每个格子大小、时间轮运转线程
public HashedWheelTimer(
ThreadFactory threadFactory,
ThreadNameDeterminer determiner,
long tickDuration, TimeUnit unit, int ticksPerWheel)
// TODO : 创建时间轮底层存储任务的数据结构
wheel = createWheel(ticksPerWheel);
// TODO : 求某一个任务落到哪个格子时需要用到的编码
mask = wheel.length - 1;
// TODO : 每个格子的时间
this.tickDuration = unit.toNanos(tickDuration);
// TODO : 时间轮处理任务的线程
workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet(),
determiner));
// TODO : 时间轮真正存储数据的容器
private final HashedWheelBucket[] wheel;
// TODO : 存放任务的队列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();
外界提交任务的时候,代码如下
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
// TODO : 启动时间轮运转线程
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// TODO : 任务放入到队列中,并没有一开始就放到时间轮上
timeouts.add(timeout);
return timeout;
时间轮运转执行任务,代码如下
public void run()
// TODO : 初始化时间轮的
startTime = System.nanoTime();
do
// TODO : 这个方法会阻塞,随着时间的推动会触发新的任务(tick),返回当前时间
final long deadline = waitForNextTick();
if (deadline > 0)
// TODO : 将队列中的任务最多取100000放到时间轮上
transferTimeoutsToBuckets();
// TODO : 获取当前格子
HashedWheelBucket bucket = wheel[(int) (tick & mask)];
// TODO : 执行时间轮上当前格子上的任务
bucket.expireTimeouts(deadline);
// TODO : 指针走动
tick++;
while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
run内部方法解析
waitForNextTick等待下一个格子触发,代码如下
private long waitForNextTick()
// TODO : 截止时间、触发时间
// TODO : 获取当前格子的触发时间,因为时间轮底层是使用数组存储任务数据,所以tick需要+1
long deadline = tickDuration * (tick + 1);
/**
* tick : 时间轮上的格子
* tickDuration : 每个格子的长度,持续时间
* deadline : 这里表示下一个格子的触发时间(触发一个格子的任务)相对时间轮起点时间(startTime)的时长
*/
for (;;)
// TODO : 相对时间轮起点的当前时间
final long currentTime = System.nanoTime() - startTime;
// TODO : 当当前时间大于等于deadline的时候,就会跳出循环
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0)
if (currentTime == Long.MIN_VALUE)
return -Long.MAX_VALUE;
else
return currentTime;
try
// TODO : 并不是一直循环
Thread.sleep(sleepTimeMs);
catch (InterruptedException e)
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
return Long.MIN_VALUE;
transferTimeoutsToBuckets将队列中任务存储到时间轮上,代码如下
private void transferTimeoutsToBuckets()
for (int i = 0; i < 100000; i++)
// TODO : 从队列中取出任务
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null)
// all processed 已全部处理
break;
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET))
// 期间被取消。所以只需从队列中删除它并继续下一个 HashedWheelTimeout
timeout.remove();
continue;
// TODO : 计算这个任务要走多少个格子
long calculated = timeout.deadline / tickDuration;
// TODO : 计算触发当前这个任务还要走多少轮,剩余回合!
/**
* calculated:触发该任务一共要走的格子数
* tick:当前已经走的格子数
* wheel.length:时间轮的长度
*/
long remainingRounds = (calculated - tick) / wheel.length;
// TODO : 任务自身携带了触发自己的轮次
timeout.remainingRounds = remainingRounds;
final long ticks = Math.max(calculated, tick);
// TODO : mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
// TODO : 将任务放到时间轮的对应格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
expireTimeouts执行处理任务,代码如下
public void expireTimeouts(long deadline)
HashedWheelTimeout timeout = head;
while (timeout != null)
boolean remove = false;
// TODO : 根据剩余回合判断是否要处理该任务,如果大于0说明还没轮到该任务
if (timeout.remainingRounds <= 0)
// TODO : 如果时间已经到了,则执行任务
/**
* deadline 是相对时间轮startTime的当前时间,也是当前格子的触发时间
* timeout.deadline 是任务的触发时间
*/
if (timeout.deadline <= deadline)
// TODO :
timeout.expire();
else
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
remove = true;
else if (timeout.isCancelled())
remove = true;
else
timeout.remainingRounds --;
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove)
remove(timeout);
timeout = next;
Kafka中的时间轮
作用
Produce 时等待 ISR 副本复制成功、延迟删除主题、会话超时检查、延迟创建主题或分区等,会被封装成不同的 DelayOperation 进行延迟处理操作,防止阻塞 Kafka请求处理线程。
抽象
名称 | 时间轮 | 时间轮的格子(桶) | 格子(桶)里的任务 | 时间轮运转线程 | 处理过期任务线程 |
---|---|---|---|---|---|
类名 | TimingWheel | TimerTaskList | TimerTaskEntry | ShutdownableThread | ExecutorService |
属性名 | timingWheel | bucket | root\\head\\tail | expirationReaper | taskExecutor |
其他一些属性:
时间轮零点时间:startMs
当前时间:currentTime
格子长度(持续时间):tickMs
时间轮大小:wheelSize
时间轮的当前层时间跨度:interval = tickMs * wheelSize
到期时间:expiration
溢出轮、升层的时间轮:overflowWheel: TimingWheel
概括时间轮工作流程
(阅读Kafka-3.1.0)
Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
1、Kafka启动的时候就启动了时间轮
2、ExpiredOperationReaper.doWork() 循环执行,首先从全局的delayQueue中获取一个bucket,如果不为空则上锁处理
3、根据bucket的到期时间尝试推进,然后会刷一次bucket中的所有任务,这些任务要么是需要立即执行的(即到期时间在 currentTime 和 currentTime + tickMs 之间),要么是需要换桶的,往前移位(即到期时间大于等于 currentTime + tickMs);立即计算的直接提交给专门的线程处理
4、最后拉取delayQueue中下一个bucket处理,一直循环下去
5、添加一个任务,首先是根据任务的到期时间expiration来判断自己会落到哪一个bucket,如果expiration不小于currentTime + tickMs,则可能是当前时间轮的任一个bucket,也可能是溢出轮中的任一个bucket
6、当任务添加到某一个bucket后会判断是否跟新了桶的到期时间,如果更新了则需要入队处理delayQueue.offer
源码
代码做了删减,只体现重点
1、Kafka中自己封装了一个可关闭的线程类 Shutdown’able’Thread ,也就是实现了该类的 ExpiredOperationReaper 内部实现了 doWork() 方法,维护着时间轮的运转
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false)
override def doWork(): Unit =
advanceClock(200L)
2、推进时钟的内部实现
def advanceClock(timeoutMs: Long): Boolean =
// TODO : 阻塞 timeoutMs = 200 毫秒,拉取一个桶:有直接返回,没有则阻塞200毫秒
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null)
writeLock.lock()
try
while (bucket != null)
// TODO : 传入当前桶的过期时间,尝试推进时间
timingWheel.advanceClock(bucket.getExpiration)
// TODO : 无论推进时间是否成功,当前桶的这些任务要么是需要立即执行的(即到期时间在 currentTime 和 currentTime + tickMs 之间),
// 要么是需要换桶的,往前移位(即到期时间大于等于 currentTime + tickMs);立即计算的直接提交给专门的线程处理
bucket.flush(addTimerTaskEntry)
// TODO : 进行下一个桶处理
bucket = delayQueue.poll()
finally
writeLock.unlock()
true
else
false
3、尝试推进时钟
def advanceClock(timeMs: Long): Unit =
/**
* currentTime + tickMs :当前桶过期时间的截止时间
* timeMs :下一个桶的过期时间
*/
if (timeMs >= currentTime + tickMs)
// currentTime 是 tickMs 的整数倍
currentTime = timeMs - (timeMs % tickMs)
// TODO : 尝试推进溢出轮的时间
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
4、bucket.flush(addTimerTaskEntry) 传入的是一个方法之后桶内的每一个任务都会走一次该方法
// TODO : 添加或处理任务
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit =
// TODO : 只有到期时间在 currentTime 和 currentTime + tickMs 之间的任务才会被直接处理
if (!timingWheel.add(timerTaskEntry))
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
// TODO : 只处理过期时间到达且不是被取消的任务
taskExecutor.submit(timerTaskEntry.timerTask)
5、添加任务到时间轮的入口也是地4步的方法,其中timingWheel.add(timerTaskEntry) 方法中会判断每一个任务是立即处理还是入队
/**
* 添加一个任务
* 添加任务的过程比较复杂,首先是根据任务的到期时间来判断自己会落到哪一个bucket,可能是当前时间轮任一个bucket,也可能是溢出轮中的任一个bucket
*/
def add(timerTaskEntry: TimerTaskEntry): Boolean =
// TODO : 任务到期时间
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled)
false
else if (expiration < currentTime + tickMs)
// TODO : 距离该任务到期仅剩最多 tickMs 毫秒了
// TODO : currentTime当前指向的时间格也属于到期部分,表示刚好到期
false
else if (expiration < currentTime + interval)
// TODO : 距离该任务到期小于一整轮的时间,大于一个格子的时间,说明它就在当前层,不需要升层
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// TODO : 如果该任务的到来改变了他所进入的桶的过期时间,即轮子已经前进并且之前的桶被重用了
// TODO : 桶是同一个桶,但是数据可能不是同一轮的,这时需要重新入队 DelayQueue
if (bucket.setExpiration(virtualId * tickMs))
queue.offer(bucket)
true
else
// TODO : 需要升层 过期时间超过了 interval
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
需要升层的情况:其实每一个时间轮对象内都有一个溢出轮的指针 overflowWheel ,他会指向父级时间轮。
总结
Kafka 使用时间轮来实现延时队列,因为其底层是任务的添加和删除是基于链表实现的,是 O(1) 的时间复杂度,满足高性能的要求;
对于时间跨度大的延时任务,Kafka 引入了层级时间轮,能更好控制时间粒度,可以应对更加复杂的定时任务处理场景;
对于如何实现时间轮的推进和避免空推进影响性能,Kafka 采用空间换时间的思想,通过 DelayQueue 来推进时间轮,算是一个经典的 trade off。
以上是关于时间轮在NettyKafka中的应用的主要内容,如果未能解决你的问题,请参考以下文章