深度剖析 Vue3 的调度系统
Posted 全栈修仙之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度剖析 Vue3 的调度系统相关的知识,希望对你有一定的参考价值。
等响应式数据的变化这里指点击按钮触发的 click 回调中,响应式数据 count.value 被修改
函数,该函数会对比组件 data 更新前的 VNode 和组件 data 更新后的 VNode,对比之间的差异,修改差异部分的 DOM。该过程叫 patch,比较 vnode 的方法叫 diff 算法(因为这里没有篇幅展开,因此大概看看记住 instance.update
的特点即可)instance
是指 Vue 内部的组件实例,我们直接使用接触不到该实例。
instance.update
是深度更新,即除了会更新组件本身,还会递归调用子组件的 instance.update
,因此,这个过程会更新整个组件树。
instance.update
会更新该组件的属性(如果父组件的传入发生变化),然后更新它对应的 DOM
**响应式数据更新 ≠ 组件 DOM **更新,响应式数据更新,只是变量值的改变,此时还没修改 DOM,但会立即执行 queueJob(instance.update)
,将组件 DOM 更新任务,加入到队列。即数据修改是立即生效的,但 DOM 修改是延迟执行
把自身的 Job 加入到队列中。为什么会需要递归?
先做个类比,应该就大概明白了:
你刚拖好地,你儿子就又把地板踩脏了,你只有重新再拖一遍。
如果你一直拖,儿子一直踩,就是无限递归了。。。这时候就应该把儿子打一顿。。。
在组件 DOM 更新(instance.update)的过程中,可能会导致自身依赖的响应式变量改变,从而调用 queueJob,将自身 Job 加入到队列。
由于响应式数据被改变(因为脏了),需要整个组件重新更新(所以需要重新拖地)
下图就是一个组件 DOM 更新过程中,导致响应式变量变化的例子:
父组件刚更新完,子组件由于属性更新,立即触发 watch,emit 事件,修改了父组件的 loading 响应式变量,导致父组件需要重新更新。
(watch 一般情况下,是加入到 Pre 队列等待执行,但在组件 DOM 更新时,watch也是加入队列,但会立即执行并清空 Pre 队列,暂时先记住有这个小特性即可)
—— flushJob 会在下一个微任务时执行。为什么执行时机为下一个微任务?为什么不能是 setTimeout(flushJob, 0)
我们目的,是延迟执行 queueJob,等所有组件数据都更新完,再执行组件 DOM 更新(instance.update)。
要达到这一目的:我们只需要等在下一个浏览器任务,执行 queueJob 即可
因为,响应式数据的更新,都在当前的浏览器任务中。当 queueJob 作为微任务执行时,就表明上一个任务一定已经完成了。
而在浏览器中,微任务比宏任务有更高的优先级,因此 queueJob 使用微任务。
浏览器事件循环示意图如下:
每次循环,浏览器只会取一个宏任务执行,而微任务则是执行全部,在微任务执行 queueJob,能在最快时间执行队列,并且接下来浏览器就会执行渲染页面,更新UI。
否则,如果 queueJob 使用宏任务,极端情况下,可能会有多个宏任务在 queueJob 之前,而每次事件循环,只会取一个宏任务,则 queueJob 的执行时机会在非常的后,这对用户体验来说是有一定的伤害的
至此,我们已经把下图蓝色部分都解析完了:
剩下的是红色部分,即函数 flushJob 部分的实现了:
队列的执行 flushJob
function flushJobs()
// 等待状态设置为 false
isFlushPending = false
// 标记队列为正在执行状态
isFlushing = true
// 执行 Pre 队列
flushPreFlushCbs()
// 根据 job id 进行排序,从小到大
queue.sort((a, b) => getId(a) - getId(b))
// 用于检测是否是无限递归,最多 100 层递归,否则就报错,只会开发模式下检查
const check = __DEV__
? (job: SchedulerJob) => checkRecursiveUpdates(seen!, job)
: NOOP
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
// 执行 Post 队列
flushPostFlushCbs()
isFlushing = false
currentFlushPromise = null
// 如果还有 Job,继续执行队列
// Post 队列运行过程中,可能又会将 Job 加入进来,会在下一轮 flushJob 执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
flushJobs()
flushJob 主要执行以下内容:
执行 Pre 队列 执行queue 队列 执行 Post 队列 循环重新执行所有队列,直到所有队列都为空
执行 queue 队列queue 队列执行对应的是这一部分:
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
循环遍历 queue,运行 Job,直到 queue 为空
queue 队列执行期间,可能会有新的 Job 入队,同样会被执行。
执行 Pre 队列export function flushPreFlushCbs()
// 有 Job 才执行
if (pendingPreFlushCbs.length)
// 执行前去重,并赋值到 activePreFlushCbs
activePreFlushCbs = [...new Set(pendingPreFlushCbs)]
// pendingPreFlushCbs 清空
pendingPreFlushCbs.length = 0
// 循环执行 Job
for (
preFlushIndex = 0;
preFlushIndex < activePreFlushCbs.length;
preFlushIndex++
)
// 开发模式下,校验无限递归的情况
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePreFlushCbs[preFlushIndex])
)
continue
// 执行 Job
activePreFlushCbs[preFlushIndex]()
// 收尾工作
activePreFlushCbs = null
preFlushIndex = 0
// 可能递归,再次执行 flushPreFlushCbs,如果队列为空就停止
flushPreFlushCbs()
主要流程如下:
Job 最开始是在 pending 队列中的 flushPreFlushCbs 执行时,将 pending 队列中的 Job 去重,并改为 active 队列 循环执行 active 队列的 Job 重复 flushPreFlushCbs,直到队列为空
执行 Post 队列export function flushPostFlushCbs(seen?: CountMap)
// 队列为空则结束
if (pendingPostFlushCbs.length)
// 去重
const deduped = [...new Set(pendingPostFlushCbs)]
pendingPostFlushCbs.length = 0
// #1947 already has active queue, nested flushPostFlushCbs call
// 特殊情况,发生了递归,在执行前 activePostFlushCbs 可能已经有值了,该情况可不必过多关注
if (activePostFlushCbs)
activePostFlushCbs.push(...deduped)
return
activePostFlushCbs = deduped
if (__DEV__)
seen = seen || new Map()
// 优先级排序
activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
// 循环执行 Job
for (
postFlushIndex = 0;
postFlushIndex < activePostFlushCbs.length;
postFlushIndex++
)
// 在开发模式下,检查递归次数,最多 100 次递归
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePostFlushCbs[postFlushIndex])
)
continue
// 执行 Job
activePostFlushCbs[postFlushIndex]()
// 收尾工作
activePostFlushCbs = null
postFlushIndex = 0
主要流程如下:
Job 最开始是在 pending 队列中的 flushPostFlushCbs 执行时,将 pending 队列中的 Job 去重,然后跟 active 队列合并 循环执行 active 队列的 Job
为什么在队列最后没有像 Pre 队列那样,再次执行 flushPostFlushCbs?
Post 队列的 Job 执行时,可能会将 Job 继续加入到队列(Pre 队列,组件异步更新队列,Post 队列都可能)
新加入的 Job,会在下一轮 flushJob 中执行:
// postFlushCb 可能又会将 Job 加入进来,如果还有 Job,继续执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
// 执行下一轮队列任务
flushJobs()
instance
是指 Vue 内部的组件实例,我们直接使用接触不到该实例。
instance.update
是深度更新,即除了会更新组件本身,还会递归调用子组件的 instance.update
,因此,这个过程会更新整个组件树。
instance.update
会更新该组件的属性(如果父组件的传入发生变化),然后更新它对应的 DOM
**响应式数据更新 ≠ 组件 DOM **更新,响应式数据更新,只是变量值的改变,此时还没修改 DOM,但会立即执行 queueJob(instance.update)
,将组件 DOM 更新任务,加入到队列。即数据修改是立即生效的,但 DOM 修改是延迟执行
为什么会需要递归?
先做个类比,应该就大概明白了:
你刚拖好地,你儿子就又把地板踩脏了,你只有重新再拖一遍。
如果你一直拖,儿子一直踩,就是无限递归了。。。这时候就应该把儿子打一顿。。。
在组件 DOM 更新(instance.update)的过程中,可能会导致自身依赖的响应式变量改变,从而调用 queueJob,将自身 Job 加入到队列。
由于响应式数据被改变(因为脏了),需要整个组件重新更新(所以需要重新拖地)
下图就是一个组件 DOM 更新过程中,导致响应式变量变化的例子:
父组件刚更新完,子组件由于属性更新,立即触发 watch,emit 事件,修改了父组件的 loading 响应式变量,导致父组件需要重新更新。
(watch 一般情况下,是加入到 Pre 队列等待执行,但在组件 DOM 更新时,watch也是加入队列,但会立即执行并清空 Pre 队列,暂时先记住有这个小特性即可)
—— flushJob 会在下一个微任务时执行。为什么执行时机为下一个微任务?为什么不能是 setTimeout(flushJob, 0)
我们目的,是延迟执行 queueJob,等所有组件数据都更新完,再执行组件 DOM 更新(instance.update)。
要达到这一目的:我们只需要等在下一个浏览器任务,执行 queueJob 即可
因为,响应式数据的更新,都在当前的浏览器任务中。当 queueJob 作为微任务执行时,就表明上一个任务一定已经完成了。
而在浏览器中,微任务比宏任务有更高的优先级,因此 queueJob 使用微任务。
浏览器事件循环示意图如下:
每次循环,浏览器只会取一个宏任务执行,而微任务则是执行全部,在微任务执行 queueJob,能在最快时间执行队列,并且接下来浏览器就会执行渲染页面,更新UI。
否则,如果 queueJob 使用宏任务,极端情况下,可能会有多个宏任务在 queueJob 之前,而每次事件循环,只会取一个宏任务,则 queueJob 的执行时机会在非常的后,这对用户体验来说是有一定的伤害的
至此,我们已经把下图蓝色部分都解析完了:
剩下的是红色部分,即函数 flushJob 部分的实现了:
队列的执行 flushJob
function flushJobs()
// 等待状态设置为 false
isFlushPending = false
// 标记队列为正在执行状态
isFlushing = true
// 执行 Pre 队列
flushPreFlushCbs()
// 根据 job id 进行排序,从小到大
queue.sort((a, b) => getId(a) - getId(b))
// 用于检测是否是无限递归,最多 100 层递归,否则就报错,只会开发模式下检查
const check = __DEV__
? (job: SchedulerJob) => checkRecursiveUpdates(seen!, job)
: NOOP
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
// 执行 Post 队列
flushPostFlushCbs()
isFlushing = false
currentFlushPromise = null
// 如果还有 Job,继续执行队列
// Post 队列运行过程中,可能又会将 Job 加入进来,会在下一轮 flushJob 执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
flushJobs()
flushJob 主要执行以下内容:
执行 Pre 队列 执行queue 队列 执行 Post 队列 循环重新执行所有队列,直到所有队列都为空
执行 queue 队列queue 队列执行对应的是这一部分:
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
循环遍历 queue,运行 Job,直到 queue 为空
queue 队列执行期间,可能会有新的 Job 入队,同样会被执行。
执行 Pre 队列export function flushPreFlushCbs()
// 有 Job 才执行
if (pendingPreFlushCbs.length)
// 执行前去重,并赋值到 activePreFlushCbs
activePreFlushCbs = [...new Set(pendingPreFlushCbs)]
// pendingPreFlushCbs 清空
pendingPreFlushCbs.length = 0
// 循环执行 Job
for (
preFlushIndex = 0;
preFlushIndex < activePreFlushCbs.length;
preFlushIndex++
)
// 开发模式下,校验无限递归的情况
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePreFlushCbs[preFlushIndex])
)
continue
// 执行 Job
activePreFlushCbs[preFlushIndex]()
// 收尾工作
activePreFlushCbs = null
preFlushIndex = 0
// 可能递归,再次执行 flushPreFlushCbs,如果队列为空就停止
flushPreFlushCbs()
主要流程如下:
Job 最开始是在 pending 队列中的 flushPreFlushCbs 执行时,将 pending 队列中的 Job 去重,并改为 active 队列 循环执行 active 队列的 Job 重复 flushPreFlushCbs,直到队列为空
执行 Post 队列export function flushPostFlushCbs(seen?: CountMap)
// 队列为空则结束
if (pendingPostFlushCbs.length)
// 去重
const deduped = [...new Set(pendingPostFlushCbs)]
pendingPostFlushCbs.length = 0
// #1947 already has active queue, nested flushPostFlushCbs call
// 特殊情况,发生了递归,在执行前 activePostFlushCbs 可能已经有值了,该情况可不必过多关注
if (activePostFlushCbs)
activePostFlushCbs.push(...deduped)
return
activePostFlushCbs = deduped
if (__DEV__)
seen = seen || new Map()
// 优先级排序
activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
// 循环执行 Job
for (
postFlushIndex = 0;
postFlushIndex < activePostFlushCbs.length;
postFlushIndex++
)
// 在开发模式下,检查递归次数,最多 100 次递归
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePostFlushCbs[postFlushIndex])
)
continue
// 执行 Job
activePostFlushCbs[postFlushIndex]()
// 收尾工作
activePostFlushCbs = null
postFlushIndex = 0
主要流程如下:
Job 最开始是在 pending 队列中的 flushPostFlushCbs 执行时,将 pending 队列中的 Job 去重,然后跟 active 队列合并 循环执行 active 队列的 Job
为什么在队列最后没有像 Pre 队列那样,再次执行 flushPostFlushCbs?
Post 队列的 Job 执行时,可能会将 Job 继续加入到队列(Pre 队列,组件异步更新队列,Post 队列都可能)
新加入的 Job,会在下一轮 flushJob 中执行:
// postFlushCb 可能又会将 Job 加入进来,如果还有 Job,继续执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
// 执行下一轮队列任务
flushJobs()
为什么执行时机为下一个微任务?为什么不能是 setTimeout(flushJob, 0)
function flushJobs()
// 等待状态设置为 false
isFlushPending = false
// 标记队列为正在执行状态
isFlushing = true
// 执行 Pre 队列
flushPreFlushCbs()
// 根据 job id 进行排序,从小到大
queue.sort((a, b) => getId(a) - getId(b))
// 用于检测是否是无限递归,最多 100 层递归,否则就报错,只会开发模式下检查
const check = __DEV__
? (job: SchedulerJob) => checkRecursiveUpdates(seen!, job)
: NOOP
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
// 执行 Post 队列
flushPostFlushCbs()
isFlushing = false
currentFlushPromise = null
// 如果还有 Job,继续执行队列
// Post 队列运行过程中,可能又会将 Job 加入进来,会在下一轮 flushJob 执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
flushJobs()
try
// 循环组件异步更新队列,执行 job
for (flushIndex = 0; flushIndex < queue.length; flushIndex++)
const job = queue[flushIndex]
// 仅在 active 时才调用 job
if (job && job.active !== false)
// 检查无限递归
if (__DEV__ && check(job))
continue
// 调用 job,带有错误处理
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
finally
// 收尾工作,重置这些用于标记的变量
flushIndex = 0 // 将队列执行的 index 重置
queue.length = 0 // 清空队列
export function flushPreFlushCbs()
// 有 Job 才执行
if (pendingPreFlushCbs.length)
// 执行前去重,并赋值到 activePreFlushCbs
activePreFlushCbs = [...new Set(pendingPreFlushCbs)]
// pendingPreFlushCbs 清空
pendingPreFlushCbs.length = 0
// 循环执行 Job
for (
preFlushIndex = 0;
preFlushIndex < activePreFlushCbs.length;
preFlushIndex++
)
// 开发模式下,校验无限递归的情况
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePreFlushCbs[preFlushIndex])
)
continue
// 执行 Job
activePreFlushCbs[preFlushIndex]()
// 收尾工作
activePreFlushCbs = null
preFlushIndex = 0
// 可能递归,再次执行 flushPreFlushCbs,如果队列为空就停止
flushPreFlushCbs()
export function flushPostFlushCbs(seen?: CountMap)
// 队列为空则结束
if (pendingPostFlushCbs.length)
// 去重
const deduped = [...new Set(pendingPostFlushCbs)]
pendingPostFlushCbs.length = 0
// #1947 already has active queue, nested flushPostFlushCbs call
// 特殊情况,发生了递归,在执行前 activePostFlushCbs 可能已经有值了,该情况可不必过多关注
if (activePostFlushCbs)
activePostFlushCbs.push(...deduped)
return
activePostFlushCbs = deduped
if (__DEV__)
seen = seen || new Map()
// 优先级排序
activePostFlushCbs.sort((a, b) => getId(a) - getId(b))
// 循环执行 Job
for (
postFlushIndex = 0;
postFlushIndex < activePostFlushCbs.length;
postFlushIndex++
)
// 在开发模式下,检查递归次数,最多 100 次递归
if (
__DEV__ &&
checkRecursiveUpdates(seen!, activePostFlushCbs[postFlushIndex])
)
continue
// 执行 Job
activePostFlushCbs[postFlushIndex]()
// 收尾工作
activePostFlushCbs = null
postFlushIndex = 0
为什么在队列最后没有像 Pre 队列那样,再次执行 flushPostFlushCbs?
// postFlushCb 可能又会将 Job 加入进来,如果还有 Job,继续执行
if (
queue.length ||
pendingPreFlushCbs.length ||
pendingPostFlushCbs.length
)
// 执行下一轮队列任务
flushJobs()
深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
承接上文
承接上一章节的内容,下面我们看继续看拉取的调度模式,PULL与PUSH模式相比,PULL模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了Pull方式的编程复杂度,RocketMQ提供了调度消费服务(MQPullConsumerScheduleService),在topic的订阅发送变化(初次订阅或距上次拉取消息超时)就触发PULL方式拉取消息。
MQPullConsumerScheduleService
MQPullConsumerScheduleService是PULL模式下面的调度服务,当RebalanceImpl.processQueueTable队列有变化时才进行消息的拉取,从而降低Pull方式的编程复杂度。在应用层按照如下方式使用:
使用MQPullConsumerScheduleService开发消费消息
实例化对象MQPullConsumerScheduleService
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
设置NameServer
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
设置消费组为集群模式
scheduleService.setMessageModel(MessageModel.CLUSTERING);
注册拉取回调函数
scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback()
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context)
MQPullConsumer consumer = context.getPullConsumer();
try
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.printf("%s%n", offset + "\\t" + mq + "\\t" + pullResult);
switch (pullResult.getPullStatus())
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
context.setPullNextDelayTimeMillis(100);
catch (Exception e)
e.printStackTrace();
);
从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。
MQPullConsumer consumer = context.getPullConsumer();
获取该消费组的该队列的消费进度
long offset = consumer.fetchConsumeOffset(mq, false);
拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
更新消费组该队列消费进度
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
设置下次拉取消息时间间隔,单位毫秒
context.setPullNextDelayTimeMillis(100);
启动调度组件,调用MQPullConsumerScheduleService.start()方法启动该调度服务。
scheduleService.start();
- 首先初始化队列监听器MessageQueueListenerImpl类,该类是MQPullConsumerScheduleService的内部类,实现了MessageQueueListener接口的messageQueueChanged方法;
- 将该监听器类赋值给DefaultMQPullConsumer.messageQueueListener变量值;
- 调用DefaultMQPullConsumer的start方法启动Consumer;
分析核心执行方法及流程
-
使用registerPullTaskCallback对Topic进行注册
-
MQPullConsumerScheduleService 会将Topic的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的Hash表中。
-
线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
-
在 doPullTask() 完成自己的拉取消息逻辑,和DefaultMQPullConsumer是一样的。
-
用户设置下次调用间隔时间
-
scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。
注册拉取任务回调函数
/**
* @param topic topic名称
* @param callback 回调函数
*/
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback)
this.callbackTable.put(topic, callback);
this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
PullTaskCallback回调函数接口
调用MQPullConsumerScheduleService.registerPullTaskCallback (String topic, PullTaskCallback callback)方法,在该方法中以topic为key值将自定义的PullTaskCallback 对象存入MQPullConsumerScheduleService. callbackTable:ConcurrentHashMap<String ,PullTaskCallback>变量中;
public interface PullTaskCallback
/**
*
* @param mq 消息队列
* @param context 任务上下文
*/
void doPullTask(final MessageQueue mq, final PullTaskContext context);
建立PullTaskCallback接口的实现类,实现该接口的doPullTask(final MessageQueue mq, final PullTaskContext context)方法。
在该方法中可以先调用DefaultMQPullConsumer.fetchConsumeOffset (MessageQueue mq, boolean fromStore)方法获取MessageQueue队列的消费进度。
PullTaskContext拉取任务上下文
调用DefaultMQPullConsumer.pull(MessageQueue mq, String subExpression, long offset, int maxNums)方法,
- 指定的队列和指定的开始位置读取消息内容;
- 获取到的消息进行相关的业务逻辑处理;
public class PullTaskContext
private int pullNextDelayTimeMillis = 200;
// 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumer
private MQPullConsumer pullConsumer;
public int getPullNextDelayTimeMillis()
return pullNextDelayTimeMillis;
/**
* 设置下次调用doPullTask()的间隔时间,默认毫秒
*/
public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis)
this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
public MQPullConsumer getPullConsumer()
return pullConsumer;
public void setPullConsumer(MQPullConsumer pullConsumer)
this.pullConsumer = pullConsumer;
- 调用DefaultMQPullConsumer.updateConsumeOffset(MessageQueue mq, long offset)方法进行消费进度的更新,其中offset值是在获取消息内容时返回的下一个消费进度值;
MQPullConsumerScheduleService的实现原理
触发拉取消息
RebalanceImpl.rebalanceByTopic()方法执行的过程中,若RebalanceImpl.processQueueTable有变化,则回调DefaultMQPullConsumer. messageQueueListener变量值的MessageQueueListenerImpl. MessageQueueChanged方法,在该方法中调用MQPullConsumerScheduleService. putTask(String topic, Set mqNewSet)方法。
-
若为广播模式(BROADCASTING),则mqNewSet为该topic下面的所有MessageQueue队列;
-
若为集群模式,则mqNewSet为给该topic分配的MessageQueue队列,putTask方法的大致逻辑如下:
-
遍历
MQPullConsumerScheduleService.taskTable: ConcurrentHashMap<MessageQueue, PullTaskImpl>
列表(表示正在拉取消息的任务列表),检查该topic下面的所有MessageQueue对象,若该对象不在入参mqNewSet集合中的,将对应的PullTaskImpl对象的cancelled变量标记为true。 -
mqNewSet集合中的MessageQueue对象,若不在MQPullConsumerScheduleService.taskTable列表中,则以MessageQueue对象为参数初始化PullTaskImpl对象,然后放入taskTable列表中,将该PullTaskImpl对象放入
MQPullConsumerScheduleService.scheduledThreadPoolExecutor
线程池中,然后立即执行该线程。
-
拉取消息的线程(PullTaskImpl)
该PullTaskImpl线程的run方法如下:
-
检查cancelled变量是为true,若为false则直接退出该线程;否则继续下面的处理;
-
以MessageQueue对象的topic值从MQPullConsumerScheduleService.callbackTable变量中获取PullTaskCallback的实现类(该类是由应用层实现);
3, 调用该PullTaskCallback实现类的doPullTask方法,即实现业务层定义的业务逻辑(通用逻辑是先获取消息内容,然后进行相应的业务处理,最后更新消费进度);
4, 再次检查cancelled变量是为true,若不为true,则将该PullTaskImpl对象再次放入MQPullConsumerScheduleService. scheduledThreadPoolExecutor线程池中,设定在200毫秒之后重新调度执行PullTaskImpl线程类;
以上是关于深度剖析 Vue3 的调度系统的主要内容,如果未能解决你的问题,请参考以下文章
Linux(内核剖析):09---进程调度之Linux调度的实现(struct sched_entityschedule())
深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)