elastic-job静态任务与动态任务实战
Posted 杨 戬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elastic-job静态任务与动态任务实战相关的知识,希望对你有一定的参考价值。
文章目录
elastic-job
分布式任务调度
概念
很多时候,我们需要定时执行一些程序完成一些预定要完成的操作,如果手动处理,一旦任务量过大,就非常麻烦,所以用定时任务去操作是个非常不错的选项,这种操作就是分布式任务调度。
分类
现在的应用多数是分布式或者微服务,所以我们需要的是分布式任务调度,那么现在分布式任务调度流行的主要有elastic-job、xxl-job、quartz等,我们这里做一个对比:
feature | quartz | elastic-job | xxl-job | antares | opencron |
---|---|---|---|---|---|
依赖 | mysql | jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+ | mysql ,jdk1.7+ , maven3.0+ | jdk 1.7+ , redis , zookeeper | jdk1.7+ , Tomcat8.0+ |
HA | 多节点部署,通过竞争数据库锁来保证只有一个节点执行任务 | 通过zookeeper的注册与发现,可以动态的添加服务器。 支持水平扩容 | 集群部署 | 集群部署 | — |
任务分片 | — | 支持 | 支持 | 支持 | — |
文档完善 | 完善 | 完善 | 完善 | 文档略少 | 文档略少 |
管理界面 | 无 | 支持 | 支持 | 支持 | 支持 |
难易程度 | 简单 | 简单 | 简单 | 一般 | 一般 |
公司 | OpenSymphony | 当当网 | 个人 | 个人 | 个人 |
高级功能 | — | 弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持 | 弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化 | 任务分片, 失效转移,弹性扩容 , | 时间规则支持quartz和crontab ,kill任务, 现场执行,查询任务运行状态 |
使用企业 | 大众化产品,对分布式调度要求不高的公司大面积使用 | 36氪,当当网,国美,金柚网,联想,唯品会,亚信,平安,猪八戒 | 大众点评,运满满,优信二手车,拍拍贷 | — | — |
elastic-job简介
中文官网:https://shardingsphere.apache.org/elasticjob/index_zh.html
ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务;ElasticJob-Cloud 使用 Mesos 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
ElasticJob 的各个产品使用统一的作业 API,开发者仅需要一次开发,即可随意部署。
下载安装使用
可以参考官方文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
实战案例
静态任务案例
使用elastic-job很容易,我们接下来学习下elastic-job的使用,这里的案例我们先实现静态任务案例,静态任务案例也就是执行时间事先写好。
实现步骤:
- 引入依赖包
- 配置zookeeper节点以及任务名称命名空间
- 实现自定义任务,需要实现SimpleJob接口
1)在seckill-goods
中引入依赖
<!-- ElasticJobAutoConfiguration自动配置类作用-->
<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
2)配置elastic-job
在bootstrap.yml
中配置elastic-job
,如下:
elaticjob:
zookeeper:
server-lists: zk-server:3181
namespace: updatetask
配置说明:
server-lists:
zookeeper的地址namespace
:定时任务命名空间
3)任务创建
创建com.seckill.goods.task.statictask.ElasticjobTask
,代码如下:
@ElasticSimpleJob(
cron = "5/10 * * * * ?",
jobName = "updateTask",
shardingTotalCount = 1
)
@Component
public class ElasticjobTask implements SimpleJob
/***
* 执行任务
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext)
System.out.println("-----------执行!");
参数说明:
- cron:定时表达式
- jobName:这里和bootstrap.yml中的namespace保持一致
- shardingTotalCount:分片数量
动态任务案例
项目参考地址:https://github.com/LuoLiangDSGA/spring-learning/tree/master/boot-elasticjob
动态任务案例主要是讲解程序在运行时,动态添加定时任务,这种场景应用非常广泛。使用elastic-job实现动态添加定时任务的实现有点复杂,我们接下来实际操作一次。
实现步骤:
- 配置初始化的zookeeper地址
- 配置的定时任务命名空间(不一定会使用)
- 注册初始化数据
- 监听器->任务执行前后监听(可有可无)
- 动态添加定时任务实现
- 自定义任务处理过程-实现SimpleJob
1)监听器创建
监听器采用AOP模式,类似前置通知和后置通知,doBeforeJobExecutedAtLastStarted
和doAfterJobExecutedAtLastCompleted
分别会在任务执行前和执行后调用,我们创建一个监听器实现任务调度前后拦截,com.seckill.goods.task.dynamic.ElasticJobListener
:
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener
/****
* 构造函数
* @param startedTimeoutMilliseconds
* @param completedTimeoutMilliseconds
*/
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds)
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
/***
* 任务初始化前要做的事情,类似前置通知
* @param shardingContexts
*/
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts)
System.out.println("========doBeforeJobExecutedAtLastStarted========"+ TimeUtil.date2FormatHHmmss(new Date()));
/***
* 任务执行完成后要做的事情,类似后置通知
* @param shardingContexts
*/
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts)
System.out.println("=======doAfterJobExecutedAtLastCompleted============="+ TimeUtil.date2FormatHHmmss(new Date()));
2)注册中心配置
在bootstrap.yml中配置zk和namespace
#配置动态任务案例的zk和namespace
zk: zk-server:3181
namesp: autotask
创建配置类配置注册中心信息,com.seckill.goods.task.dynamic.ElasticJobConfig
:
@Configuration
public class ElasticJobConfig
//配置文件中的zookeeper的ip和端口
@Value(value = "$zk")
private String serverlists;
//指定一个命名空间
@Value("$namesp")
private String namespace;
/***
* 配置Zookeeper和namespace
* @return
*/
@Bean
public ZookeeperConfiguration zkConfig()
return new ZookeeperConfiguration(serverlists, namespace);
/***
* 向zookeeper注册初始化信息
* @param config
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config)
return new ZookeeperRegistryCenter(config);
/****
* 创建ElasticJob的监听器实例
* @return
*/
@Bean
public ElasticJobListener elasticJobListener()
//初始化要给定超时多少秒重连
return new ElasticJobListener(100L,100L);
3)任务构建
我们创建一个动态配置任务的类,任何逻辑代码需要创建定时任务,可以直接调用该类的指定方法即可。创建类:com.seckill.goods.task.dynamic.ElasticJobHandler
,代码如下:
@Component
public class ElasticJobHandler
@Resource
private ZookeeperRegistryCenter registryCenter;
@Resource
private ElasticJobListener elasticJobListener;
/**
* @param jobName:任务的命名空间
* @param jobClass:执行的定时任务对象
* @param shardingTotalCount:分片个数
* @param cron:定时周期表达式
* @param id:自定义参数
* @return
*/
private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
Class<? extends SimpleJob> jobClass,
int shardingTotalCount,
String cron,
String id)
//创建任务构建对象
LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.
//任务命名空间名字、任务执行周期表达式、分片个数
newBuilder(jobName, cron, shardingTotalCount).
//自定义参数
jobParameter(id).
build(),
jobClass.getCanonicalName()));
//本地配置是否可覆盖注册中心配置
builder.overwrite(true);
return builder;
/**
* 添加一个定时任务
* @param cron:周期执行表达式
* @param id:自定义参数
* @param jobName:命名空间
* @param instance:任务对象
*/
public void addPublishJob(String cron,String id,String jobName,SimpleJob instance)
LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
jobName,
instance.getClass(),
1,
cron,
id).overwrite(true).build();
//DynamicTask为具体的任务执行逻辑类
new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
/***
* Date转cron表达式
*/
public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
/**
* 获得定时
* @param date
* @return
*/
public static String getCron(final Date date)
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
return simpleDateFormat.format(date);
4)执行逻辑
我们接着创建一个类,用于执行自己所需要操作的逻辑,com.seckill.goods.task.dynamic.DynamicTask
,代码如下:
public class DynamicTask implements SimpleJob
@Override
public void execute(ShardingContext shardingContext)
//传递的参数
String id = shardingContext.getJobParameter();
try
//具体任务逻辑
System.out.println("执行你的逻辑代码!param:"+id);
catch (Exception e)
e.printStackTrace();
5)调用测试
创建com.seckill.goods.controller.TaskController
动态调用创建任务的方法,代码如下:
@RestController
@RequestMapping(value = "/task")
public class TaskController
@Autowired
ElasticJobHandler elasticJobHandler;
/***
* 动态创建任务
* @param times:延迟时间,为了测试到效果,所以在当前时间往后延迟
* @param jobname:任务名字
* @param param:自定义参数
* @return
*/
@GetMapping
public Result add(Long times,String jobname,String param)
//在当前指定时间内延迟times毫秒执行任务
Date date = new Date(System.currentTimeMillis()+times);
//需要传递给定时任务的参数
String cron = ElasticJobHandler.getCron(date);
//执行任务
elasticJobHandler.addPublishJob(cron,param,jobname,new DynamicTask());
return new Result(true, StatusCode.OK,"添加任务成功!");
6)测试
访问:http://localhost:18081/task?times=15000&jobname=asyncname¶m=No001
后台执行效果如下:
以上是关于elastic-job静态任务与动态任务实战的主要内容,如果未能解决你的问题,请参考以下文章