Springboot整合Elastic-Job

Posted xmz_java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot整合Elastic-Job相关的知识,希望对你有一定的参考价值。

上文我们讲到Springboot整合Elastic-Job整合的demo,只是简单的实现了主要功能。本文在上文基础上,进行新的调整。

事件追踪


 Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。我们只需要将添加如下配置即可

/**
     * 将作业运行的痕迹进行持久化到DB
     */
    @Bean
    public JobEventConfiguration jobEventConfiguration(){
        return new JobEventRdbConfiguration(dataSource);
    }

项目运行后,Elastic-Job会自动创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引。

 

使用注解


 

上文我们添加一个任务的步骤是,定义一个任务类,再在配置类中定义任务属性,并加入到SpringJobScheduler。如果我们有几百个任务,配置类基本就无法维护了。那怎么优化呢,我们可以参考@Schedual注解,在job上定义一个注解,每次启动的时候扫描注解自动将job加入到SpringJobScheduler中。

1.抽象添加job方法

@Component
public class ElasticJobHandler {
    @Autowired
    private ZookeeperRegistryCenter regCenter;
    @Resource
    private JobEventConfiguration jobEventConfiguration;
    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @Description 任務配置類
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                        , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }

    public void addJob(final SimpleJob simpleJob,
                       final String cron,
                       final Integer shardingTotalCount,
                       final String shardingItemParameters)
            throws IllegalAccessException, InstantiationException {
        LiteJobConfiguration jobConfig =
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);

         new SpringJobScheduler(simpleJob, regCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
    }
}

 

2.添加ElasticScheduler注解

@Component
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticScheduler {
    /**
     * 任务名称
     * @return
     */
    String name();

    /**
     * cron表达式,用于控制作业触发时间
     * @return
     */
    String cron() default "";

    /**
     * 分片参数
     * @return
     */
    String shardingItemParameters() default "";

    /**
     * 总分片数
     * @return
     */
    int shardingTotalCount();

    /**
     * 任务描述信息
     * @return
     */
    String description() default "";
}

3.定义扫描方法

@Component
public class ElasticSchedulerAspect implements ApplicationContextAware, InitializingBean {
    private ApplicationContext applicationContext;
    @Autowired
    private ElasticJobHandler elasticJobHandler;
    @Override
    public void afterPropertiesSet() throws Exception {
        registrJob(applicationContext);
    }

    /**
     * 解析context信息,开始注册
     * @param applicationContext
     */
    private void registrJob(ApplicationContext applicationContext) {
        String[] beanNamesForAnnotation = applicationContext.getBeanNamesForAnnotation(ElasticScheduler.class);
        for (String beanName : beanNamesForAnnotation) {
            Class<?> handlerType = applicationContext.getType(beanName);
            Object bean = applicationContext.getBean(beanName);
            ElasticScheduler annotation = AnnotationUtils.findAnnotation(handlerType, ElasticScheduler.class);
            addJobToContext(annotation,bean);
        }
    }

    /**
     * 将任务添加到容器中
     * @param elasticScheduler
     * @param bean
     */
    private void addJobToContext(ElasticScheduler elasticScheduler, Object bean) {
        String cron = elasticScheduler.cron();
        String name = elasticScheduler.name();
        String description = elasticScheduler.description();
        String shardingItemParameters = elasticScheduler.shardingItemParameters();
        Integer shardingTotalCount = elasticScheduler.shardingTotalCount();
        try {
            elasticJobHandler.addJob((SimpleJob) bean,cron,shardingTotalCount,shardingItemParameters);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
    }

}

4.使用注解

@Component
@ElasticScheduler(cron = "0/5 * * * * ?",shardingTotalCount = 4,name = "测试注解",shardingItemParameters = "0=0,1=0,2=1,3=1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "當前任務名稱: %s.當前任務參數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));
    }
}

注意,该注解只为了不想引入太多外部依赖自己随手写的,只为给大家提供思路。git上已经有人对用注解整合Elastic-Job了,大家可自行搜索。

 

以上是关于Springboot整合Elastic-Job的主要内容,如果未能解决你的问题,请参考以下文章

elasticjob学习一:simplejob初识和springboot整合

springboot整合quartz项目使用(含完整代码)

springboot-01整合quartz

elastic-job集成到springboot教程,和它的一个异常处理办法:Sharding item parameters '1' format error, should be

elastic-job-lite使用文档

微服务分布式调度Elastic-job