Spring batch的学习
Posted kevin_shen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring batch的学习相关的知识,希望对你有一定的参考价值。
Spring batch是用来处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。
Spring batch主要有以下部分组成:
- JobRepository 用来注册job的容器
- JobLauncher 用来启动Job的接口
- Job 实际执行的任务,包含一个或多个Step
- Step step包含ItemReader、ItemProcessor和ItemWriter
- ItemReader 用来读取数据的接口
- ItemProcessor 用来处理数据的接口
- ItemWriter 用来输出数据的接口
以上Spring Batch的主要组成部分只需要注册成Spring的Bean即可。若想开启批处理的支持还需在配置类上使用@EnableBatchProcessing,在Spring Batch中提供了大量的ItemReader和ItemWriter的实现,用来读取不同的数据来源,数据的处理和校验都要通过ItemProcessor接口实现来完成。
Spring Boot的支持
Spring Boot对Spring Batch支持的源码位于org.springframework.boot.autoconfigure.batch下。
Spring Boot为我们自动初始化了Spring Batch存储批处理记录的数据库。
spring batch会自动加载hsqldb驱动,根据需求选择去留。
下面是一个spring boot支持spring batch 的例子:
1. 实体类
1 public class Person { 2 3 @Size(max=4,min=2) //使用JSR-303注解来校验注解 4 private String name; 5 6 private int age; 7 8 private String nation; 9 10 private String address; 11 12 public String getName() { 13 return name; 14 } 15 16 public void setName(String name) { 17 this.name = name; 18 } 19 20 public int getAge() { 21 return age; 22 } 23 24 public void setAge(int age) { 25 this.age = age; 26 } 27 28 public String getNation() { 29 return nation; 30 } 31 32 public void setNation(String nation) { 33 this.nation = nation; 34 } 35 36 public String getAddress() { 37 return address; 38 } 39 40 public void setAddress(String address) { 41 this.address = address; 42 } 43 }
2. 校验器
1 public class CsvBeanValidator<T> implements Validator<T>,InitializingBean { 2 private javax.validation.Validator validator; 3 @Override 4 public void afterPropertiesSet() throws Exception { //使用JSR-303的Validator来校验我们的数据,在此处进行JSR-303的Validator的初始化 5 ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); 6 validator = validatorFactory.usingContext().getValidator(); 7 } 8 9 @Override 10 public void validate(T value) throws ValidationException { 11 Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //使用Validator的validate方法校验数据 12 if(constraintViolations.size()>0){ 13 14 StringBuilder message = new StringBuilder(); 15 for (ConstraintViolation<T> constraintViolation : constraintViolations) { 16 message.append(constraintViolation.getMessage() + "\n"); 17 } 18 throw new ValidationException(message.toString()); 19 20 } 21 22 } 23 24 }
3. ItemProcessor
1 public class CsvItemProcessor extends ValidatingItemProcessor<Person>{ 2 3 @Override 4 public Person process(Person item) throws ValidationException { 5 super.process(item); //需要执行super.process(item)才会调用自定义校验器 6 7 if(item.getNation().equals("汉族")){ //对数据做简单的处理,若民族为汉族,则数据转换成01,其余转换成02 8 item.setNation("01"); 9 }else{ 10 item.setNation("02"); 11 } 12 return item; 13 } 14 15 16 }
4. Job监听(监听器要实现JobExecutionListener接口,并重写其beforeJob、afterJob方法即可)
1 public class CsvJobListener implements JobExecutionListener{ 2 3 long startTime; 4 long endTime; 5 @Override 6 public void beforeJob(JobExecution jobExecution) { 7 startTime = System.currentTimeMillis(); 8 System.out.println("任务处理开始"); 9 } 10 11 @Override 12 public void afterJob(JobExecution jobExecution) { 13 endTime = System.currentTimeMillis(); 14 System.out.println("任务处理结束"); 15 System.out.println("耗时:" + (endTime - startTime) + "ms"); 16 } 17 18 }
5. 配置
1 @Configuration 2 @EnableBatchProcessing 3 public class CsvBatchConfig { 4 5 @Bean 6 public ItemReader<Person> reader() throws Exception { 7 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //使用FlatFileItemReader读取文件 8 reader.setResource(new ClassPathResource("people.csv")); //使用FlatFileItemReader的setResource方法设置CSV文件的路径 9 reader.setLineMapper(new DefaultLineMapper<Person>() {{ //在此处对CVS文件的数据和领域模型类做对应映射 10 setLineTokenizer(new DelimitedLineTokenizer() {{ 11 setNames(new String[] { "name","age", "nation" ,"address"}); 12 }}); 13 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ 14 setTargetType(Person.class); 15 }}); 16 }}); 17 return reader; 18 } 19 20 @Bean 21 public ItemProcessor<Person, Person> processor() { 22 CsvItemProcessor processor = new CsvItemProcessor(); //使用自定义的ItemProcessor的实现 23 processor.setValidator(csvBeanValidator()); //为Processor指定校验器 24 return processor; 25 } 26 27 28 29 @Bean 30 public ItemWriter<Person> writer(DataSource dataSource) {//Spring能让容器中已有的Bean以参数的形式注入,Spring boot已经定义了DataSource 31 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库 32 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); 33 String sql = "insert into person " + "(id,name,age,nation,address) " 34 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; 35 writer.setSql(sql); //在此设置要执行批处理的sql语句 36 writer.setDataSource(dataSource); 37 return writer; 38 } 39 40 @Bean 41 public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) 42 throws Exception { 43 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 44 jobRepositoryFactoryBean.setDataSource(dataSource); 45 jobRepositoryFactoryBean.setTransactionManager(transactionManager); 46 jobRepositoryFactoryBean.setDatabaseType("oracle"); 47 return jobRepositoryFactoryBean.getObject(); 48 } 49 50 @Bean 51 public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) 52 throws Exception { 53 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 54 jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); 55 return jobLauncher; 56 } 57 58 @Bean 59 public Job importJob(JobBuilderFactory jobs, Step s1) { 60 return jobs.get("importJob") 61 .incrementer(new RunIdIncrementer()) 62 .flow(s1) //指定step 63 .end() 64 .listener(csvJobListener()) //绑定监听器 65 .build(); 66 } 67 68 @Bean 69 public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, 70 ItemProcessor<Person,Person> processor) { 71 return stepBuilderFactory 72 .get("step1") 73 .<Person, Person>chunk(65000) //批处理每次提交65000条数据 74 .reader(reader) //给step绑定reader 75 .processor(processor) //给step绑定Processor 76 .writer(writer) //给step绑定writer 77 .build(); 78 } 79 80 81 82 @Bean 83 public CsvJobListener csvJobListener() { 84 return new CsvJobListener(); 85 } 86 87 @Bean 88 public Validator<Person> csvBeanValidator() { 89 return new CsvBeanValidator<Person>(); 90 } 91 92 93 }
6.application.xml
1 spring.datasource.driverClassName=oracle.jdbc.OracleDriver 2 spring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xe 3 spring.datasource.username=boot 4 spring.datasource.password=boot 5 6 spring.batch.job.enabled=true 7 8 logging.level.org.springframework.web = DEBUG
上面的例子是自动触发批处理的,当我们需要手动触发批处理时,需要将CsvBatchConfig类的@Configuration注解注释掉,让此配置类不再起效,新建TriggerBatchConfig配置类,内容与CsvBatchConfig完全一致,除了修改定义ItemReader这个Bean;另外,还需要修改application.xml配置文件spring.batch.job.enable=false
1 @Configuration 2 @EnableBatchProcessing 3 public class TriggerBatchConfig { 4 5 @Bean 6 @StepScope 7 public FlatFileItemReader<Person> reader(@Value("#{jobParameters[‘input.file.name‘]}") String pathToFile) throws Exception { 8 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); // 9 reader.setResource(new ClassPathResource(pathToFile)); // 10 reader.setLineMapper(new DefaultLineMapper<Person>() {{ // 11 setLineTokenizer(new DelimitedLineTokenizer() {{ 12 setNames(new String[] { "name","age", "nation" ,"address"}); 13 }}); 14 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ 15 setTargetType(Person.class); 16 }}); 17 }}); 18 19 return reader; 20 } 21 22 @Bean 23 public ItemProcessor<Person, Person> processor() { 24 CsvItemProcessor processor = new CsvItemProcessor(); 25 processor.setValidator(csvBeanValidator()); 26 return processor; 27 } 28 29 30 31 @Bean 32 public ItemWriter<Person> writer(DataSource dataSource) { 33 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); 34 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); 35 String sql = "insert into person " + "(id,name,age,nation,address) " 36 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; 37 writer.setSql(sql); //3 38 writer.setDataSource(dataSource); 39 return writer; 40 } 41 42 @Bean 43 public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) 44 throws Exception { 45 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 46 jobRepositoryFactoryBean.setDataSource(dataSource); 47 jobRepositoryFactoryBean.setTransactionManager(transactionManager); 48 jobRepositoryFactoryBean.setDatabaseType("oracle"); 49 return jobRepositoryFactoryBean.getObject(); 50 } 51 52 @Bean 53 public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) 54 throws Exception { 55 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 56 jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); 57 return jobLauncher; 58 } 59 60 @Bean 61 public Job importJob(JobBuilderFactory jobs, Step s1) { 62 return jobs.get("importJob") 63 .incrementer(new RunIdIncrementer()) 64 .flow(s1) 65 .end() 66 .listener(csvJobListener()) 67 .build(); 68 } 69 70 @Bean 71 public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, 72 ItemProcessor<Person,Person> processor) { 73 return stepBuilderFactory 74 .get("step1") 75 .<Person, Person>chunk(65000) 76 .reader(reader) 77 .processor(processor) 78 .writer(writer) 79 .build(); 80 } 81 82 83 84 @Bean 85 public CsvJobListener csvJobListener() { 86 return new CsvJobListener(); 87 } 88 89 @Bean 90 public Validator<Person> csvBeanValidator() { 91 return new CsvBeanValidator<Person>(); 92 } 93 94 95 }
控制层代码
1 @RestController 2 public class DemoController { 3 4 @Autowired 5 JobLauncher jobLauncher; 6 7 @Autowired 8 Job importJob; 9 public JobParameters jobParameters; 10 11 @RequestMapping("/read") 12 public String imp(String fileName) throws Exception{ 13 14 String path = fileName+".csv"; 15 jobParameters = new JobParametersBuilder() 16 .addLong("time", System.currentTimeMillis()) 17 .addString("input.file.name", path) 18 .toJobParameters(); 19 jobLauncher.run(importJob,jobParameters); 20 return "ok"; 21 } 22 23 }
以上是关于Spring batch的学习的主要内容,如果未能解决你的问题,请参考以下文章
Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理