spring-batch入门

Posted evilposeidon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-batch入门相关的知识,希望对你有一定的参考价值。

企业中经常会需要批处理才能处理完成的业务操作,比如:自动化地处理大批量复杂数据,如月结计算;重复性地处理大批量数据,如费率计算;充当内部系统和外部系统的数据纽带,中间需要对数据进行格式化,校验,转换处理等.

1.框架搭建

技术图片

在pom中导入Spring Batch,mysql,和JDBC依赖,

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cc.mrbird</groupId>
    <artifactId>spring-batch-start</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-batch-start</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

在编写代码前,了解一下Spring Batch的构成

技术图片

? spring-batch里最基本的单元就是人物Job,一个job由若干个步骤step组成,任务启动器job launcher负责运行job,任务存储仓库job Repository存贮者job的执行状态,参数,和日志等信息.

? job可以分为三大类:

  • 数据读取item Reader

  • 数据中间处理item Processor

  • 数据输出item Writer

    任务存储仓库可以是关系型数据库MySQL,菲关系型数据库MongoDB或直接存储字内存中

2.编写第一个任务

在springboot的入口类上添加@EnableBatchProcessing注解,表示开启SpringBatch批处理功能

技术图片

步骤step是由若干个小人物taskLet组成的,所以我们通过tasklet方法创建,tasklet方法接受了一个tasklet类型参数,tasklet是一个函数式接口,源码如下

public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

我们可以使用lambda表达式创建一个匿名实现:

(contribution, chunkContext) -> {
    System.out.println("执行步骤....");
    return RepeatStatus.FINISHED;
}

技术图片

3.多步骤任务

一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在job包下新建MultiStepJobDemo类:

@Component
public class MultiStepJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiStepJob() {
        return jobBuilderFactory.get("multiStepJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们通过step1()step2()step3()三个方法创建了三个步骤。Job里要使用这些步骤,只需要通过JobBuilderFactorystart方法指定第一个步骤,然后通过next方法不断地指定下一个步骤即可。

启动项目,控制台打印日志如下:

2020-03-06 13:52:52.188  INFO 18472 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multiStepJob]] launched with the following parameters: [{}]
2020-03-06 13:52:52.222  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 13:52:52.251  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 29ms
2020-03-06 13:52:52.292  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 13:52:52.323  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step2] executed in 30ms
2020-03-06 13:52:52.375  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 13:52:52.405  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 29ms
2020-03-06 13:52:52.428  INFO 18472 --- [           main] o.s.b.c.l.support.SimpleJobLauncher

三个步骤依次执行成功。

多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:

@Component
public class MultiStepJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiStepJob() {
        return jobBuilderFactory.get("multiStepJob2")
                .start(step1())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
                .from(step2())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
                .from(step3()).end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

multiStepJob()方法的含义是:multiStepJob2任务先执行step1,当step1状态为完成时,接着执行step2,当step2的状态为完成时,接着执行step3。ExitStatus.COMPLETED常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:

public class ExitStatus implements Serializable, Comparable<ExitStatus> {

    /**
     * Convenient constant value representing unknown state - assumed not
     * continuable.
     */
    public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");

    /**
     * Convenient constant value representing continuable state where processing
     * is still taking place, so no further action is required. Used for
     * asynchronous execution scenarios where the processing is happening in
     * another thread or process and the caller is not required to wait for the
     * result.
     */
    public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");

    /**
     * Convenient constant value representing finished processing.
     */
    public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");

    /**
     * Convenient constant value representing job that did no processing (e.g.
     * because it was already complete).
     */
    public static final ExitStatus NOOP = new ExitStatus("NOOP");

    /**
     * Convenient constant value representing finished processing with an error.
     */
    public static final ExitStatus FAILED = new ExitStatus("FAILED");

    /**
     * Convenient constant value representing finished processing with
     * interrupted status.
     */
    public static final ExitStatus STOPPED = new ExitStatus("STOPPED");

    ...
}

技术图片

4.Flow的用法

Flow的作用就是可以将多个步骤Step组合在一起然后再组装到任务job中

@Component
public class FlowJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .start(flow())
                .next(step3())
                .end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    // 创建一个flow对象,包含若干个step
    private Flow flow() {
        return new FlowBuilder<Flow>("flow")
                .start(step1())
                .next(step2())
                .build();
    }
}

通过FlowBuilder将step1和step2组合在一起,创建了一个名为flow的Flow,然后将其赋给任务job,使用flow和step构建job的区别是,job流程中包含flow类型的时候需要在build方法前调用end()方法

5.并行执行

? 任务中的步骤可以串行执行外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率

? 将任务并行化只需要简单步骤:

  1. 将步骤Step转换为Flow
  2. 将任务Job中指定并行Flow
@Component
public class SplitJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job splitJob() {
        return jobBuilderFactory.get("splitJob")
                .start(flow1())
                .split(new SimpleAsyncTaskExecutor()).add(flow2())
                .end()
                .build();

    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Flow flow1() {
        return new FlowBuilder<Flow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }

    private Flow flow2() {
        return new FlowBuilder<Flow>("flow2")
                .start(step3())
                .build();
    }
}

上面的例子中,我们创建了两个Flow:flow(包含step1和step2)和flow2(包含step3)然后通过jobBuilderFactory的split方法,指定一个异步执行器,将flow1和flow2异步执行

技术图片

6.任务决策器

? 决策器的作用就是可以指定程序在不同的情况下运行不同的流程,比如今天是周末,则让任务执行step1和step2,如果是工作日,则选择step1和step3

? 使用决策器前,我们需要自定义一个决策器的实现

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();

        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            return new FlowExecutionStatus("weekend");
        } else {
            return new FlowExecutionStatus("workingDay");
        }
    }
}

MyDecider实现JobExecutionDecider接口的decide方法,该方法返回FlowExecutionStatus。上面的逻辑是:判断今天是否是周末,如果是,返回FlowExecutionStatus("weekend")状态,否则返回FlowExecutionStatus("workingDay")状态。

下面演示如何在任务Job里使用决策器。在job包下新建DeciderJobDemo

@Component
public class DeciderJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private MyDecider myDecider;

    @Bean
    public Job deciderJob() {
        return jobBuilderFactory.get("deciderJob")
                .start(step1())
                .next(myDecider)
                .from(myDecider).on("weekend").to(step2())
                .from(myDecider).on("workingDay").to(step3())
                .from(step3()).on("*").to(step4())
                .end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }


    private Step step4() {
        return stepBuilderFactory.get("step4")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤四操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

上面代码中,我们注入了自定义决策器MyDecider,然后在jobDecider()方法里使用了该决策器:

@Bean
public Job deciderJob() {
    return jobBuilderFactory.get("deciderJob")
            .start(step1())
            .next(myDecider)
            .from(myDecider).on("weekend").to(step2())
            .from(myDecider).on("workingDay").to(step3())
            .from(step3()).on("*").to(step4())
            .end()
            .build();
}

这段代码的含义是:任务deciderJob首先执行step1,然后指定自定义决策器,如果决策器返回weekend,那么执行step2,如果决策器返回workingDay,那么执行step3,那么无论step3 的结果如何都将执行step4

技术图片

7.任务嵌套

任务job除了可以由step或者Flow构成外,我们还可以将多个job转换为特殊的step.然后再赋给另一个任务job,这就是任务的嵌套

@Component
public class NestedJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;

    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }


    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }

    // 子任务二
    private Job childJobTwo() 
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

上面的代码中,我们通过childJobOne()和childJobTwo()方法创建两个任务Job,关键在于childJobOne()和childJobTwo(),在childJobOne()方法中,我们通过JobStepBuilder构建了一个名称为childJobOneStep的Step,顾名思义,他是一个任务型Step的构造工厂,可以将任务转换为特殊的步骤,在构建过程中,我们还需要传入任务执行器JobLauncher,任务仓库JobRepository和事务管理器PlantformTransactionManager.将任务转换为特殊步骤后,将其赋给父任务parentJob即可

技术图片

以上是关于spring-batch入门的主要内容,如果未能解决你的问题,请参考以下文章

保存图像库 64 AngularJS Spring-Batch

推荐net开发cad入门阅读代码片段

Spring-batch(ItemWriter)数据写入数据库,普通文件,xml文件,多文件分类写入

Spring-Batch 没有将元数据持久化到数据库?

Spring-batch:如何在 Spring Batch 中使用 skip 方法捕获异常消息?

Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理