Spring Batch学习记录及示例项目代码
Posted 浅殇忆流年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Batch学习记录及示例项目代码相关的知识,希望对你有一定的参考价值。
Spring Batch学习记录总结
关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ
Spring Batch的一些基本概念:
-
ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。
-
ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。
-
ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。
-
Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。
-
Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。
-
Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。
-
JobLauncher:负责启动Job。
-
JobRepository:存储关于配置和执行的Job(作业)的元数据。
简单使用
1、创建一个SpringBoot项目。
2、在项目中创建包config,并在该包下面创建一个Configuration类。
在类上面加上@Configuration
;@EnableBatchProcessing
;两个注解。
@Configuration
@EnableBatchProcessing
public class SingleJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 程序执行的入口
* @return
*/
@Bean
public Job helloJob(){
return jobBuilderFactory.get("hellojpb-1")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
}
).build();
}
}
3、现在启动项目,发现控制台出现以下错误信息。
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
Reason: Failed to determine a suitable driver class
Action:
Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).
解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息
;因此我们需要配置具体的数据库信息。
方式一:配置内存数据库H2。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
方式二:配置mysql数据库。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
Mysql数据库连接信息:
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always
Spring Batch具体分析
-
Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。
tasklet方式:
在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。
chunk方式:
该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。
-
Spring Batch框架主要有四个角色:
JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
Job:代表一个具体的任务。
Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。
Step的两种实现方式:
//方式1 chunk
@Bean
Step step() {
return stepBuilderFactory.get(STEP)
.<Music, Music>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build();
}
//方式2 tasklet
@Bean
public Step step1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
}
).build();
}
Flow的创建和使用
- Flow是多个Step的集合。
- 可以被多个Job复用。
- 使用FlowBuilder来创建。
@Configuration
@EnableBatchProcessing
public class FlowDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建 Step1
* @return
*/
@Bean
public Step flowDemoStep1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep1");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step2
* @return
*/
@Bean
public Step flowDemoStep2() {
return stepBuilderFactory.get("step-2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep2");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step3
* @return
*/
@Bean
public Step flowDemoStep3() {
return stepBuilderFactory.get("step-3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep3");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Flow 对象 ,指明Flow对象包含哪些Step
*/
public Flow flowDemoFlow(){
return new FlowBuilder<Flow>("flowDemoFlow")
.start(flowDemoStep1())
.next(flowDemoStep2())
.build();
}
/**
* 创建Job对象
* @return
*/
@Bean
public Job job(){
return jobBuilderFactory.get("flowjob")
.start(flowDemoFlow())
.next(flowDemoStep3())
.end()
.build();
}
split实现并发执行
实现任务中的多个Step或多个flow并发执行。
关键代码:
@Configuration
@EnableBatchProcessing
public class SplitDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建 Step1
* @return
*/
@Bean
public Step splitDemoStep1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep1");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step2
* @return
*/
@Bean
public Step splitDemoStep2() {
return stepBuilderFactory.get("step-2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep2");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step3
* @return
*/
@Bean
public Step splitDemoStep3() {
return stepBuilderFactory.get("step-3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep3");
return RepeatStatus.FINISHED;
}
}
).build();
}
创建Flow 对象 ,指明Flow对象包含哪些Step:
public Flow splitDemoFlow1(){
return new FlowBuilder<Flow>("splitDemoFlow1")
.start(splitDemoStep1())
.build();
}
public Flow splitDemoFlow2(){
return new FlowBuilder<Flow>("splitDemoFlow2")
.start(splitDemoStep2())
.next(splitDemoStep3())
.build();
}
/**
* 创建任务
* @return
*/
@Bean
public Job job(){
return jobBuilderFactory.get("splitDemoJob")
.start(splitDemoFlow1())
.split(new SimpleAsyncTaskExecutor())
.add(splitDemoFlow2())
.end()
.build();
}
}
决策器的使用
接口:JobExecutionDecider
自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法
public class MyDecider implements JobExecutionDecider {
private int count;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count ++;
if(count % 2 == 0){
return new FlowExecutionStatus("even");
}else {
return new FlowExecutionStatus("odd");
}
}
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step deciderDemoStep1(){
return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring deciderDemoStep1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep2() {
return stepBuilderFactory.get("deciderDemoStep2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("even");
return RepeatStatus.FINISHED;
}
}
).build();
}
@Bean
public Step deciderDemoStep3() {
return stepBuilderFactory.get("deciderDemoStep3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("odd");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建决策器
* @return
*/
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
@Bean
public Job deciderDemoJob(){
return jobBuilderFactory.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even").to(deciderDemoStep2())
.from(myDecider()).on("odd").to(deciderDemoStep3())
.from(deciderDemoStep3()).on("*").to(myDecider())
.end()
.build();
}
}
完整例子
主要代码如下:
1、项目总体结构图。
2、在pom.xml文件中引入依赖。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!--引入Spring Batch-->
<dependency>以上是关于Spring Batch学习记录及示例项目代码的主要内容,如果未能解决你的问题,请参考以下文章
限制 JdbcPagingItemReader 在 Spring Batch 项目中可以读取的总记录数
Spring batch - 项目阅读器知道阅读器处理的最后一条记录
Spring学习10-SpringMV核心组件2及SpringMVC项目示例