Springboot整合Elastic-Job

Posted xmz_java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot整合Elastic-Job相关的知识,希望对你有一定的参考价值。

Elastic-Job是当当网的任务调度开源框架,有以下功能

分布式调度协调
弹性扩容缩容
失效转移
错过执行作业重触发
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
自诊断并修复分布式不稳定造成的问题
支持并行调度
支持作业生命周期操作
丰富的作业类型
Spring整合以及命名空间提供
运维平台

具体信息可以查看 官网 ,Elastic-Job的文档很详细,同时也有相应的demo。但是,美中不足的是他的springboot版本的demo用的是xml结构的。网上的例子都有点乱,花了点时间整合了下,现在就开始吧。

 

快速入门


 1.pom文件

        <elastic-job.version>2.1.5</elastic-job.version>

<dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <!-- elastic-job-lite-spring -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>

2.定义zookeeper

@Configuration
public class ElasticRegCenterConfig {
    /**
     * 配置zookeeper
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(
            @Value("${regCenter.serverList}") final String serverList,
            @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

3.定义job

@Component
public class SimpleJobDemo implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "当前分片項: %s.当前參數: %s," +
                        "当前任務名稱: %s.当前任務參數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));
    }
}

4.定义任务监听器,统计每次任务执行的时间

public class MyElasticJobListener implements ElasticJobListener {
    private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class);

    private long beginTime = 0;
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();

        logger.info("===>{} JOB BEGIN TIME: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(beginTime));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(endTime), endTime - beginTime);
    }
}

5.配置JobConfiuration,配置job随容器一起启动

@Configuration
public class ElasticJobConfig {
    @Autowired
    private ZookeeperRegistryCenter regCenter;
    /**
     * 配置任务监听器
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new MyElasticJobListener();
    }
    /**
     * 配置任务详细信息
     * @param jobClass
     * @param cron
     * @param shardingTotalCount
     * @param shardingItemParameters
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob,
                                           @Value("${stockJob.cron}") final String cron,
                                           @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(simpleJob, regCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters),
                elasticJobListener);
    }
}

配置文件如下

server.port=${random.int[10000,19999]}
regCenter.serverList = localhoost:2181
regCenter.namespace = elastic-job-lite-springboot

stockJob.cron = 0/5 * * * * ?
stockJob.shardingTotalCount = 4
stockJob.shardingItemParameters = 0=0,1=1,2=0,3=1

 

启动项目,输出

  

 大家可以仔细的观察下配置文件和输出的对应关系。假设我先有这样的需求。

定时任务在两台主机上A,B同时运行.A处理id是奇数的数据,B处理Id为偶数的数据

用传统的定时任务例如@Schedule注解能实现么?那当然是可以,直接写死硬编码即可。用Elastic-Job的话,则相当灵活,示例如下

public class DbQueryJob implements SimpleJob {
    @Autowired
    private XXXDao xxxDao;
    @Override
    public void execute(ShardingContext shardingContext) {
        String shardingParameter = shardingContext.getShardingParameter();
        //mod是对id取余后的结果
        List<xxx> xxxlists=xxxDao.select("select * from table where mod="+shardingParameter);
        xxxlists.forEach(x->{
            System.out.println("参数:"+shardingContext.getShardingParameter()+"状态"+x.getStatus());
        });
    }
}

稍微讲下原理

Elastic-Job默认采用平均分片策略

如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

上述配置文件我们分成4片,A服务分到0,1  B服务分到2,3,同时 shardingItemParameters 参数表明了每个分片对应的ItemParameters,所以A服务的 shardingContext.getShardingParameter()=0

通过这样的逻辑,我们就能实现我们的分片业务了。

以上,就是Springboot整合Elastic-Job

 

 源码

 

以上是关于Springboot整合Elastic-Job的主要内容,如果未能解决你的问题,请参考以下文章

elasticjob学习一:simplejob初识和springboot整合

springboot整合quartz项目使用(含完整代码)

springboot-01整合quartz

elastic-job集成到springboot教程,和它的一个异常处理办法:Sharding item parameters '1' format error, should be

elastic-job-lite使用文档

微服务分布式调度Elastic-job