如何构建延迟任务调度系统
Posted cnndevelop
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何构建延迟任务调度系统相关的知识,希望对你有一定的参考价值。
一、需求目标
1.需求描述
之前笔者接触过一些营销业务场景,比如说:
用户注册未登录过APP第二天早上10点发一条营销短信促活
红包过期前两天短信通知,下午16:00发送
等等定时任务处理业务。
采用的技术方案是定时任务扫数据汇总表,分页读取一定数量然后处理
然而随着业务的发展,业务多元化,遇到了以下场景:
拼团砍价活动过期前半小时提醒
订单提交半小时内没有完成支付,订单自动取消,库存退还
用户几天内没有操作过系统,发放激活短信
以上场景处理时间不是固定的某个点,而是业务发生的时间推迟一段时间,针对以上的业务场景,我们考虑可以根据不同业务建表,然后每隔一段时间去定时扫表,各自处理业务。
但是随着业务增加,表泛滥,而且此类业务其实有很多相同的地方,那么我们可以考虑把相同逻辑抽离出来,利用延迟队列来处理任务
2.延时队列设计目标
可靠性:任务进入延时队列之后,必须被执行一次
高可用性:支持多实例部署
实时性:允许一定时间误差,当然误差越小越好
可管理:支持消息删除
高性能:数据量大的情况下也能保证高性能
二、技术调研
延时队列实现的几种方式
java.util.Timer + java.util.TimerTask
java.util.concurrent.ScheduledExecutorService
Quartz
java.util.concurrent.DelayQueue
数据库轮询
redis过期键通知
rocketMQ中的延时队列
1. Timer+TimerTask
使用 Timer 实现任务调度的核心类是 Timer 和 TimerTask。其中 Timer 负责设定 TimerTask 的起始与间隔执行时间。使用者只需要创建一个 TimerTask 的继承类,实现自己的 run 方法,然后将其丢给 Timer 去执行即可
Timer 的设计核心是一个 TaskList 和一个 TaskThread。Timer 将接收到的任务丢到自己的 TaskList 中,TaskList 按照 Task 的最初执行时间进行排序。TimerThread 在创建 Timer 时会启动成为一个守护线程。这个线程会轮询所有任务,找到一个最近要执行的任务,然后休眠,当到达最近要执行任务的开始时间点,TimerThread 被唤醒并执行该任务。之后 TimerThread 更新最近一个要执行的任务,继续休眠。
实现思想:应用维护一个全局的Timer调度器,延时任务实现TimerTask,run方法中实现逻辑。计算好具体的延迟执行时间,交给Timer去调度。
选型评估:简单易用,但是缺点较多,单线程调度,所有任务都是串行的,性能低,前一个任务的延迟或异常都将会影响到之后的任务,影响实时性,同时也不具备延时队列的几点能力
3.2 ScheduledExecutorService
基于Timer的缺陷,JDK5推出了基于线程池设计的 ScheduledExecutor,原理是
每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。
选型评估:引入多线程,解决了Timer的一些缺点,但是只适合单机,分布式环境不支持。也不具备延时队列的几点能力,需要考虑跟别的技术结合使用评估是否可以满足延时队列的能力。
3.3 Quartz
Quartz是个轻量级的任务调度框架,可以跟多个应用集成,并且具有容错机制,重启服务的时候内存中丢失的任务可以被持久化
选型评估:
Quartz满足了我们需要的延时队列的可靠性: 持久化任务,避免了服务重启的时候内存中的任务丢失,高可用:执行任务的节点挂了,另外的节点会继续执行
集群分布式并发环境中使用QUARTZ定时任务调度,会在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务,Quartz的任务触发只能在单个节点运行,其它节点不执行任务,性能低,浪费资源
3.4 DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
选型评估:
效率高,任务触发时间延迟低
适用于单机,需要结合其他技术运用,
数据是保存在内存,需要自己实现持久化
不具备分布式能力,需要自己实现高可用
3.5 数据库轮询
每隔一段时间去查询数据库,处理好的记录标记状态
选型评估:定期轮询数据量大的时候会消耗太多IO资源,效率低
3.6 redis过期键通知
需要DBA做一些额外的配置,开启这个功能
选型评估:Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了
7. rocketMQ中的延时队列
选型评估:rocketMQ中消息延迟时间为固定时间段:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,粒度不够,不能很好支持业务
总结
延时队列的技术点还有很多,比如说时间轮之类的方案,要满足延时队列的几点特性,实现高可用,可靠性,我们需要结合多个技术去实现。
三、架构设计
功能设计
系统功能:
延迟任务调度系统提供统一的任务操作接口给业务方调用,业务方可以提交任务,取消任务,查询任务状态。
调度服务属于底层应用,因此采用MQ的方式解耦,所有触发的延迟任务都通过消息的方式发送给业务消费方,
由消费方控制流量,业务幂等。同时也保证了任务的重试机制。
采用技术:elastic-job + db + delayQueue + mq
整体架构
业务调用方
业务方在需要延迟任务的时候调用延迟任务服务操作任务
触发的延迟任务会放到MQ消息队列里面,由业务方自行消费
业务方消费消息处理完成之后,调用延迟任务服务通知处理结果
延迟任务节点
以dubbo方式提供延迟任务接口供业务方操作,用于添加延迟任务,取消任务,反馈任务处理结果。
集成elastic-job提供数据分片功能,每个节点按照对应分片从数据库加载即将触发的延迟任务放到内存中
任务调度触发的延迟任务发送到MQ消息队列中
接收业务调用的延迟消息处理结果反馈
Zookeeper
elastic-job注册中心,存储作业信息
elastic-job
高可用的分布式任务调度系统
注册任务实例信息和分片信息到zk上
数据分片
elastic-job作业数据分片
节点添加/删除,主节点选举,重新分片
任务加载作业
由elastic-job实现,使用数据分片功能,提升系统总吞吐量
将未来N分钟内要触发的任务加载到内存中
任务在内存中的存储和调度
任务加载作业将未来N分钟内触发的任务加载到内存队列DelayQueue
任务调度依靠DelayQueue精确触发
数据库
延迟任务持久化,存储任务数据
延迟任务状态
INIT(1, "初始化"), LOAD(2, "任务已加载"), SENDING(3, "消息已发放"), SUCCESS(4, "业务处理成功"), FAIL(5, "业务处理失败"), CANCEL(6, "业务取消");
数据库设计
CREATE TABLE `delay_task` ( `delay_task_id` bigint(20) NOT NULL COMMENT ‘任务ID‘, `sharding_id` tinyint(4) NOT NULL COMMENT ‘分片ID‘, `topic` varchar(100) NOT NULL COMMENT ‘消息topic‘, `tag` varchar(100) NOT NULL COMMENT ‘消息tag‘, `params` varchar(1000) NOT NULL COMMENT ‘参数‘, `trigger_time` bigint(19) NOT NULL COMMENT ‘执行时间‘, `status` tinyint(4) NOT NULL COMMENT ‘任务状态:1.初始化 2.任务已加载 3.消息已发放 4.业务处理成功 5.业务处理失败‘, `extend_field` varchar(100) NOT NULL COMMENT ‘扩展属性‘, `create_time` bigint(20) NOT NULL COMMENT ‘创建时间‘, `op_time` bigint(20) NOT NULL COMMENT ‘最近一次更新时间‘, `last_ver` int(10) NOT NULL COMMENT ‘版本号‘, `is_valid` tinyint(2) NOT NULL DEFAULT ‘1‘ COMMENT ‘是否有效 0-失效 1-有效‘, PRIMARY KEY (`delay_task_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘延迟任务表‘
数据库设计就一张表delay_task,用来存储延迟任务的数据,包括业务方要消费的消息的tag,topic,以及消息体内容
源码:https://github.com/caisl/delay-task
以上是关于如何构建延迟任务调度系统的主要内容,如果未能解决你的问题,请参考以下文章