分布式任务elastic-job
Posted huonan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式任务elastic-job相关的知识,希望对你有一定的参考价值。
# elastic-job简介
目前Elastic job的最新版本已经由原来的elastic-job-core分离除了两个项目,分别为Elastic-Job-Lite和Elastic-Job-Cloud。Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。 Elastic-Job-Cloud使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API开发作业,开发者仅需一次开发,即可根据需要以Lite或Cloud的方式部署 .[转自官网:https://github.com/dangdangdotcom/elastic-job/blob/master/README_cn.md]
一般的技术quartz、spring task、java.util.Timer,这几种如果在单一机器上跑其实问题不大,但是如果一旦应用于集群环境做分布式部署,就会带来一个致命的问题,那就是重复执行,当然解决方案有,但是必须依赖数据库,将任务执行状态持久化下来。所以当当就把quartz和zookeeper结合起来达到分布式调度,并且添加其他功能,形成了elastic-job。
elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
# 功能
1. 主要功能
a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。
b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。
e) 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
2. 其他功能
a) 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
b) Spring命名空间支持:elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。
c) 运维平台:提供web控制台用于管理作业。
下载源码
https://github.com/elasticjob
#spring boot quick start
Add maven dependency
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core --> <!-- import elastic-job lite core --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.6</version> </dependency> <!-- import other module if need --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.6</version> </dependency>
RegCenter configuration
@Configuration public class RegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter() { String serverList = ConfigOne.getProperty(ConfigConstants.JOB_REG_ZK); String namespace = ConfigOne.getProperty(ConfigConstants.JOB_REG_NS); return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
Job configuration
@Configuration public class SimpleJobConfig { //private String defaultCron = "0/5 * * * * ?"; //默认每天0点10分开始统计 @Value("${job.default.cron}") private String defaultCron = "0 10 0 * * ?"; @Value("${job.default.shardingTotalCount}") private int defaultShardTotal = 1; @Value("${job.default.shardingItemParameters}") private String defaultShardPrams = ""; @Resource private ZookeeperRegistryCenter regCenter; /*@Resource private JobEventConfiguration jobEventConfiguration;*/ @Resource private StatsDeviceJob statsDeviceJob; @Resource private StatsUserJob statsUserJob; @Resource private StatsDeviceFaultJob statsDeviceFaultJob; @Resource private StatsDeviceAlarmJob statsDeviceAlarmJob; @Resource private StatsRuleJob statsRuleJob; @Resource private StatsYumairJob statsYumairJob; //@Bean(initMethod = "init") @PostConstruct public void init() { //statsDeviceJob new SpringJobScheduler(statsDeviceJob, regCenter, getLiteJobConfiguration(statsDeviceJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsUserJob new SpringJobScheduler(statsUserJob, regCenter, getLiteJobConfiguration(statsUserJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsDeviceFaultJob new SpringJobScheduler(statsDeviceFaultJob, regCenter, getLiteJobConfiguration(statsDeviceFaultJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsDeviceAlarmJob new SpringJobScheduler(statsDeviceAlarmJob, regCenter, getLiteJobConfiguration(statsDeviceAlarmJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsRuleJob new SpringJobScheduler(statsRuleJob, regCenter, getLiteJobConfiguration(statsRuleJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsYumairJob 每小时5分运行一次 new SpringJobScheduler(statsYumairJob, regCenter, getLiteJobConfiguration(statsYumairJob.getClass(), "0 5 * * * ?", defaultShardTotal, defaultShardPrams)).init(); } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int defaultShardTotal, final String defaultShardPrams) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, defaultShardTotal).shardingItemParameters(defaultShardPrams).build(), jobClass.getCanonicalName())).overwrite(true).build(); } }
Job development
@Service public class StatsDeviceJob implements SimpleJob { private static Logger logger = LoggerFactory.getLogger(StatsDeviceJob.class); @Autowired private DeviceStatsFeignClient deviceStatsFeignClient; @Autowired private DeviceDataFeignClient deviceDataFeignClient; @Autowired private IDataStatsFacade dataStatsFacade; /** * 1.当分片数为1时,在同一个zookepper和jobname情况下,多台机器部署了Elastic job时, * 只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行 * 2.当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 * 此时每台服务器可根据拿到的shardingItem值进行相应的处理 * 目前job分片数全部置为1,即不使用分片 * @param shardingContext */ @Override public void execute(ShardingContext shardingContext) { logger.info(String.format("ShardingItem: %s | Thread: %s | %s", shardingContext.getShardingItem(), Thread.currentThread().getId(), "SIMPLE")); deviceStatsJob(); } public void deviceStatsJob() { //TODO } }
# 运维平台和RESTFul API部署(可选)
1. 下载或者克隆elastic-job源码
地址:https://github.com/dangdangdotcom/elastic-job
2. maven编译安装
进入到elastic-job目录,按住Shift+鼠标右键,选择“在此处打开命令窗口(W)”,执行如下命令:
```
mvn clean install -Dmaven.test.skip=true
```
```
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] elastic-job ....................................... SUCCESS [23.570s]
[INFO] elastic-job-common ................................ SUCCESS [0.053s]
[INFO] elastic-job-common-core ........................... SUCCESS [27.108s]
[INFO] elastic-job-common-restful ........................ SUCCESS [29.844s]
[INFO] elastic-job-lite .................................. SUCCESS [0.078s]
[INFO] elastic-job-lite-core ............................. SUCCESS [7.249s]
[INFO] elastic-job-lite-lifecycle ........................ SUCCESS [3.766s]
[INFO] elastic-job-lite-spring ........................... SUCCESS [1:08:42.613s
]
[INFO] elastic-job-lite-console .......................... SUCCESS [2:31.964s]
[INFO] elastic-job-cloud ................................. SUCCESS [0.031s]
[INFO] elastic-job-cloud-executor ........................ SUCCESS [3.728s]
[INFO] elastic-job-cloud-scheduler ....................... SUCCESS [33.803s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:13:24.136s
[INFO] Finished at: Sat Dec 02 18:33:33 CST 2017
[INFO] Final Memory: 55M/272M
[INFO] ------------------------------------------------------------------------
```
3. 解压上一步打好的包
路径:elastic-job\elastic-job-lite\elastic-job-lite-console\target\elastic-job-lite-console-2.1.6.tar.gz
elastic-job-lite-console-2.1.6\bin目录下是启动脚本
windows环境用:start.bat
linux环境用:start.sh
elastic-job-lite-console-2.1.6\conf目录下是配置文件auth.properties,配置的用户名和密码
4. 解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。
5. 打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。
6. 访问RESTFul API方法同控制台。
以上是关于分布式任务elastic-job的主要内容,如果未能解决你的问题,请参考以下文章