Spring Batch学习记录及示例项目代码

Posted 浅殇忆流年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Batch学习记录及示例项目代码相关的知识,希望对你有一定的参考价值。

Spring Batch学习记录总结

关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ

Spring Batch的一些基本概念:

  • ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。

  • ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。

  • ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。

  • Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。

  • Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。

  • Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。

  • JobLauncher:负责启动Job。

  • JobRepository:存储关于配置和执行的Job(作业)的元数据。

简单使用

1、创建一个SpringBoot项目。

2、在项目中创建包config,并在该包下面创建一个Configuration类。

在类上面加上@Configuration;@EnableBatchProcessing;两个注解。

    @Configuration
    @EnableBatchProcessing
    public class SingleJobConfiguration {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        /**
         * 程序执行的入口
         * @return
         */
        @Bean
        public Job helloJob(){
            return jobBuilderFactory.get("hellojpb-1")
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    }

3、现在启动项目,发现控制台出现以下错误信息。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class


​ Action:

Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).

解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息;因此我们需要配置具体的数据库信息。

方式一:配置内存数据库H2。

         <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
         </dependency>

方式二:配置mysql数据库。

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

Mysql数据库连接信息:

    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
    spring.datasource.username=root
    spring.datasource.password=123456
 
    spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
    spring.batch.initialize-schema=always

Spring Batch具体分析

  • Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。

    tasklet方式:

    在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。

    chunk方式:

    该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。

  • Spring Batch框架主要有四个角色:
    JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
    Job:代表一个具体的任务。
    Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
    JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。

Step的两种实现方式:

   //方式1   chunk
        @Bean
        Step step() {
            return stepBuilderFactory.get(STEP)
                    .<Music, Music>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .build();
    
        }
    //方式2   tasklet
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

Flow的创建和使用

  • Flow是多个Step的集合。
  • 可以被多个Job复用。
  • 使用FlowBuilder来创建。
    @Configuration
    @EnableBatchProcessing
    public class FlowDemo {

        @Autowired
        private JobBuilderFactory jobBuilderFactory;
        
        @Autowired
        private StepBuilderFactory stepBuilderFactory;


        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step flowDemoStep1() {
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring flowDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
        
        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step flowDemoStep2() {
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring flowDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
        
        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step flowDemoStep3() {
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring flowDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
        
        /**
         * 创建Flow 对象   ,指明Flow对象包含哪些Step
         */
        public Flow  flowDemoFlow(){
            return new FlowBuilder<Flow>("flowDemoFlow")
                    .start(flowDemoStep1())
                    .next(flowDemoStep2())
                    .build();
        }
        
        /**
         * 创建Job对象
         * @return
         */
        @Bean
        public Job  job(){
            return jobBuilderFactory.get("flowjob")
                    .start(flowDemoFlow())
                    .next(flowDemoStep3())
                    .end()
                    .build();
        
        }

split实现并发执行

实现任务中的多个Step或多个flow并发执行。

关键代码:

    @Configuration
    @EnableBatchProcessing
    public class SplitDemo {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step splitDemoStep1() {
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring splitDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    
        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step  splitDemoStep2() {
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring splitDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    
        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step  splitDemoStep3() {
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello spring splitDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

创建Flow 对象 ,指明Flow对象包含哪些Step:

      public Flow  splitDemoFlow1(){return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}public Flow  splitDemoFlow2(){
            return new FlowBuilder<Flow>("splitDemoFlow2")
                    .start(splitDemoStep2())
                    .next(splitDemoStep3())
                    .build();
        }
        /**
         * 创建任务
         * @return
         */
        @Bean
        public Job  job(){
            return jobBuilderFactory.get("splitDemoJob")
                    .start(splitDemoFlow1())
                    .split(new SimpleAsyncTaskExecutor())
                    .add(splitDemoFlow2())
                    .end()
                    .build();
        }
    }

决策器的使用

接口:JobExecutionDecider

自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法

public class MyDecider implements JobExecutionDecider {
    private int count;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count ++;
        if(count % 2 == 0){
            return new FlowExecutionStatus("even");
        }else {
            return new FlowExecutionStatus("odd");
        }
    }
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    @Bean
    public Step deciderDemoStep1(){
       return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("hello spring deciderDemoStep1");
                return RepeatStatus.FINISHED;
        }
        }).build();
    }

    @Bean
    public Step deciderDemoStep2() {
        return stepBuilderFactory.get("deciderDemoStep2").tasklet(
                new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("even");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }
    @Bean
    public Step deciderDemoStep3() {
        return stepBuilderFactory.get("deciderDemoStep3").tasklet(
                new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("odd");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }


    /**
     * 创建决策器
     * @return
     */
    @Bean
    public JobExecutionDecider myDecider(){
        return new MyDecider();
    }

    @Bean
    public Job  deciderDemoJob(){
        return jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider())
                .end()
                .build();
    }
}

完整例子

主要代码如下:
1、项目总体结构图。

2、在pom.xml文件中引入依赖。

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--引入Spring Batch-->
        <dependency>以上是关于Spring Batch学习记录及示例项目代码的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch 项目读取监听器

限制 JdbcPagingItemReader 在 Spring Batch 项目中可以读取的总记录数

Spring batch - 项目阅读器知道阅读器处理的最后一条记录

Spring学习10-SpringMV核心组件2及SpringMVC项目示例

spring batch(二):核心部分:配置Spring batch

Spring-batch学习总结—Job,Flow创建及应用,多线程并发,决策器,监听器,参数