ElasticJob简单使用

Posted 赵镇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob简单使用相关的知识,希望对你有一定的参考价值。

ElasticJob单点使用

任务类

public class BackupJob implements SimpleJob {
    public void execute(ShardingContext shardingContext) {
            String selectSql = "select * from resume where state=\'未归档\' limit 1";
        List<Map<String, Object>> list =
                JdbcUtil.executeQuery(selectSql);
        if(list == null || list.size() == 0) {
            return;
        }
        Map<String, Object> stringObjectMap = list.get(0);
        long id = (long) stringObjectMap.get("id");
        String name = (String) stringObjectMap.get("name");
        String education = (String)
                stringObjectMap.get("education");
// 打印出这条记录
        System.out.println("======>>>id:" + id + " name:" +
                name + " education:" + education);
// 更改状态
        String updateSql = "update resume set state=\'已归档\' where id=?";
        JdbcUtil.executeUpdate(updateSql,id);
// 归档这条记录
        String insertSql = "insert into resume_bak select * from resume where id=?";
        JdbcUtil.executeUpdate(insertSql,id);
    }

}

主要的任务就是将未归档的数据整理到归档的表中,表结构一样
执行类

public class JobMain {
    public static void main(String[] args) {
        //初始化注册中心
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
        CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();
        //创建任务
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",1).build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
        //执行任务
        new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
    }
}

这种情况下,启动两个任务类只会有一个在执行任务。但是当一个任务停止之后,另一个任务会立马开始接着执行任务,相当于其他中间件中的主备切换。但是这里的主备切换是依托zk进行的

多节点分布式任务调度

修改执行类的代码为

public class JobMain {
    public static void main(String[] args) {
        //初始化注册中心
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
        CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();
        //创建任务
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",3).build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
        //执行任务
        new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();
    }
}

除了修改分片数还需要在执行任务的类中执行相应的分片参数,另外需要注意的是仅仅增加分票策略是不生效的,必须同时配置分片参数。另外如果使用同一个job来做执行的话。需要增加overwrite为true

执行器代码为
```
public class BackupJob implements SimpleJob {
public void execute(ShardingContext shardingContext) {
    int shardingitem = shardingContext.getShardingItem();
    System.out.println("当前分片"+shardingitem);
    String shardingParamter = shardingContext.getShardingParameter();
    System.out.println(shardingParamter);
        String selectSql = "select * from resume where state=\'未归档\' and name=\'"+shardingParamter+"\' limit 1";
    List<Map<String, Object>> list =
            JdbcUtil.executeQuery(selectSql);
    if(list == null || list.size() == 0) {
        return;
    }
    Map<String, Object> stringObjectMap = list.get(0);
    long id = (long) stringObjectMap.get("id");
    String name = (String) stringObjectMap.get("name");
    String education = (String)
            stringObjectMap.get("education");

// 打印出这条记录

    System.out.println("======>>>id:" + id + " name:" +
            name + " education:" + education);

// 更改状态

    String updateSql = "update resume set state=\'已归档\' where id=?";
    JdbcUtil.executeUpdate(updateSql,id);

// 归档这条记录

    String insertSql = "insert into resume_bak select * from resume where id=?";
    JdbcUtil.executeUpdate(insertSql,id);
}

}

测试结果为,当执行器未全部启动时,所有分片在一个执行器上运行。当三个执行器都启动时,会平均分配到三个执行器。

demo代码地址为https://github.com/zhendiao/deme-code/tree/main/schedule_job

以上是关于ElasticJob简单使用的主要内容,如果未能解决你的问题,请参考以下文章

ElasticJob和SpringBoot

elasticjob学习二:封装elasticjob-spring-boot-starter

elasticjob学习一:simplejob初识和springboot整合

elastic job 问题汇总

分布式任务elasticjob框架原理了解及使用

SpringBoot定时任务 - 什么是ElasticJob?如何集成ElasticJob实现分布式任务调度?