看完这篇文章,还敢说自己不会分布式任务调度?
Posted XiaoLin__Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了看完这篇文章,还敢说自己不会分布式任务调度?相关的知识,希望对你有一定的参考价值。
一、概述
1.1、什么是任务调度
我们可以思考一下下面业务场景的解决方案:
- 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券
- 某银行系统需要在信用卡到期还款日的前三天进行短信提醒
- 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总
以上场景就是任务调度所需要解决的问题,任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程。
在Spring中也提供了定时任务注解@Scheduled
。我们只需要在业务中贴上注解然后在启动类上贴上@EnableScheduling
注解即可完成任务调度功能。
@Scheduled(cron = "0/20 * * * * ? ") // 每隔20秒执行一次
public void doWork(){
//doSomething
}
1.2、分布式调度出现
感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?主要的原因有以下几点:
- 机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来一个统计需要1小时,现在业务方需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并行处理可以提高单位时间的处理效率,但是单机能力毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。
- 高可用:单机版的定式任务调度只能在一台机器上运行,如果程序或者系统出现异常就会导致功能不可用。虽然可以在单机程序实现的足够稳定,但始终有机会遇到非程序引起的故障,而这个对于一个系统的核心功能来说是不可接受的。
- 防止重复执行: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时又每台服务又有定时任务时,若不进行合理的控制在同一时间,只有一个定时任务启动执行,这时,定时执行的结果就可能存在混乱和错误了。
1.3、Elastic-Job
Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-job-Lite和Elastic-Job-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。Elastic-Job的github地址。他的功能主要是:
-
分布式调度协调
在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一任务多实例重复执行。
-
丰富的调度策略:
基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
-
弹性拓容缩容
当集群中增加一个实例,它应当能够被选举被执行任务;当集群减少一个实例时,他所执行的任务能被转移到别的示例中执行。
-
失效转移
某示例在任务执行失败后,会被转移到其他实例执行。
-
错过执行任务重触发
若因某种原因导致作业错过执行,自动记录错误执行的作业,并在下次次作业完成后自动触发。
-
支持并行调度
支持任务分片,任务分片是指将一个任务分成多个小任务在多个实例同时执行。
-
作业分片一致性
当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。
-
支持作业生命周期操作
可以动态对任务进行开启及停止操作。
-
丰富的作业类型
支持Simple、DataFlow、Script三种作业类型
1.4、启动zookeeper
- 解压包
- 到conf目录中把oo_sample.cfg 拷贝一份 , 修改名字为zoo.cfg。
- 到bin目录中启动startup.cmd文件(用1命令行启动)。
1.4、启动zookeeper图形化界面
- 解压。
- 到1build目录中,找到jar包。
- 使用命令:java -jar jar包的名字
二、Elastic-Job快速入门
Elastic-Job的1环境要求:
- JDK 要求1.7以上保本
- Maven 要求3.0.4及以上版本
- Zookeeper 要求采取3.4.6以上版本
2.1、环境搭建
安装运行zookeeper
创建一个maven项目并导入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
写任务类
public class XiaoLinJob implements SimpleJob {
// 写任务类
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("定时任务开始");
}
}
编写配置类
public class JobDemo {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
//配置zk地址,调度任务的组名
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job-demo");
zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob","0/1 * * * * ?",1).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, XiaoLinJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
2.2、测试
- 当只有一台启动的时候,按照corn表达式进行任务调度。
- 开启两台机器的1时候,新开的一台会继续执行定时任务,旧的1那一台会停止。
三、SpringBoot集成Elastic-Job
3.1、引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
3.2、编写配置文件
application.yaml
因为配置中心的地址不是固定的,所以我们不能写死在代码中,需要把他写在配置文件中。新建一个配置文件:
elasticjob:
zookeeper-url: localhost:2181
group-name: elastic-job-group
zookeeper注册中心配置类
// 注册中心的配置类
@Configuration
public class RegistryCenterConfig {
@Bean(initMethod = "init")
// 从配置文件中获取注册中心的的url和命名空间
public CoordinatorRegistryCenter coordinatorRegistryCenter(
@Value("${elasticjob.zookeeper-url}") String zookeeperUrl,
@Value("${elasticjob.group-name}") String namespace){
// zk的配置
ZookeeperConfiguration zookeeperConfiguration =
new ZookeeperConfiguration(zookeeperUrl,namespace);
// 设置超时时间
zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000000);
// 创建注册中心
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
return zookeeperRegistryCenter;
}
}
任务调度的配置类
@Configuration
public class JobConfig {
@Autowired
XiaoLinJob xiaoLinJob;
@Autowired
private CoordinatorRegistryCenter registryCenter;
private static LiteJobConfiguration createJobConfiguration(
final Class<? extends SimpleJob> jobClass, // 任务的名字
final String cron, // cron表达式
final int shardingTotalCount, // 分片的数量
final String shardingItemParameters // 分片类信奉的参数
){
JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(),cron,shardingTotalCount);
if(!StringUtils.isEmpty(shardingItemParameters)){
jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
}
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), XiaoLinJob.class.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return simpleJobRootConfig;
}
@Bean(initMethod = "init")
public SpringJobScheduler initSimpleElasticJob(){
SpringJobScheduler springJobScheduler = new SpringJobScheduler(xiaoLinJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",1,null));
return springJobScheduler;
}
}
自定义任务类
@Component
public class XiaoLinJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("============");
}
}
3.3、测试
四、小案例
4.1、单机版本
4.1.1、需求描述
数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。
4.1.2、创建数据库
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for t_file_custom
-- ----------------------------
DROP TABLE IF EXISTS `t_file_custom`;
CREATE TABLE `t_file_custom` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
`type` varchar(255) DEFAULT NULL,
`backedUp` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_file_custom
-- ----------------------------
INSERT INTO `t_file_custom` VALUES ('1', '文件1', '内容1', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('2', '文件2', '内容2', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('3', '文件3', '内容3', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('4', '文件4', '内容4', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('5', '文件5', '内容5', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('6', '文件6', '内容6', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('7', '文件6', '内容7', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('8', '文件8', '内容8', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('9', '文件9', '内容9', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('10', '文件10', '内容10', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('11', '文件11', '内容11', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('12', '文件12', '内容12', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('13', '文件13', '内容13', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('14', '文件14', '内容14', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('15', '文件15', '内容15', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('16', '文件16', '内容16', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('17', '文件17', '内容17', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('18', '文件18', '内容18', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('19', '文件19', '内容19', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('20', '文件20', '内容20', 'vedio', '1');
4.1.3、Druid&MyBatis
4.1.3.1、添加依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
4.1.3.2、集成数据库
spring:
datasource:
url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8
driverClassName: com.mysql.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: admin
4.1.4、添加实体类
@Data
public class FileCustom {
//唯一标识
private Long id;
//文件名
private String name;
//文件类型
private String type;
//文件内容
private String content;
//是否已备份
private Boolean backedUp = false;
public FileCustom(){}
public FileCustom(Long id, String name, String type, String content){
this.id = id;
this.name = name;
this.type = type;
this.content = content;
}
}
4.1.5、添加任务类
@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {
@Autowired
FileCopyMapper fileCopyMapper;
@Override
public void execute(ShardingContext shardingContext) {
doWork();
}
private void doWork() {
List<FileCustom> fileCustoms = fileCopyMapper.selectAll();
if (fileCustoms.size() == 0){
log.info("备份完成");
return;
}
log.info("需要备份的文件个数为:"+fileCustoms.size());
for (FileCustom fileCustom : fileCustoms) {
backUpFile(fileCustom);
}
}
private void backUpFile(FileCustom fileCustom) {
try {
Thread.sleep(1000);
log.info("执行备份文件:"+fileCustom);
fileCopyMapper.backUpFile(fileCustom.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.1.6、添加Mapper
@Mapper
public interface FileCopyMapper {
@Select("select * from t_file_custom where backedUp = 0")
List<FileCustom> selectAll();
@Update("update t_file_custom set backedUp = 1 where id = #{id}")
void backUpFile(Long id);
}
4.1.7、添加任务调度配置
@Bean(initMethod = "init")
public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",1,null));
return springJobScheduler;
}
4.1.8、存在的问题
为了高可用,我们会对这个项目做集群的操作,可以保证其中一台挂了,另外一台可以继续工作.但是在集群的情况下,调度任务只在一台机器上运行,如果单个任务调度比较耗时,耗资源的情况下,对这台机器的消耗还是比较大的。
但是这个时候,其他机器却是空闲着的,如何合理的利用集群的其他机器且如何让任务执行得更快些呢?这时候Elastic-Job提供了任务调度分片的功能。
4.2、集群版本
4.2.1、分片概念
作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。
例如在单机版本的备份数据的案例,如果有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。
如果由于服务器拓容应用实例数量增加为4,则作业遍历数据
以上是关于看完这篇文章,还敢说自己不会分布式任务调度?的主要内容,如果未能解决你的问题,请参考以下文章
GitHub上标星75k+超牛的华为内部首次公开源码笔记,看完你还敢说你不会SpringBoot?
京东T6:这份《Java 异步编程实战》文档看完还敢说不会?