spring-batch (ItemProcessor) 数据处理过程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-batch (ItemProcessor) 数据处理过程相关的知识,希望对你有一定的参考价值。
Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
代码:
Person
package com.dhcc.batch.batchDemo.processor;
import java.util.Date;
public class Person {
private Integer id;
private String name;
private String perDesc;
private Date createTime;
private Date updateTime;
private String sex;
private Float score;
private Double price;
public Person() {
super();
}
public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
Double price) {
super();
this.id = id;
this.name = name;
this.perDesc = perDesc;
this.createTime = createTime;
this.updateTime = updateTime;
this.sex = sex;
this.score = score;
this.price = price;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getCreateTime() {
return createTime;
}
public String getPerDesc() {
return perDesc;
}
public void setPerDesc(String perDesc) {
this.perDesc = perDesc;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public Float getScore() {
return score;
}
public void setScore(Float score) {
this.score = score;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
+ updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
}
}
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.item.file.transform.LineAggregator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class PersonLineAggregator implements LineAggregator<Person> {
//JSON
private ObjectMapper mapper=new ObjectMapper();
@Override
public String aggregate(Person person) {
try {
return mapper.writeValueAsString(person);
} catch (JsonProcessingException e) {
throw new RuntimeException("unable to writer...",e);
}
}
}
PersonRowMapper
package com.dhcc.batch.batchDemo.processor;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
/**
* 实现将数据库中的每条数据映射到Person对象中
* @author Administrator
*
*/
public class PersonRowMapper implements RowMapper<Person> {
/**
* rs一条结果集,rowNum代表当前行
*/
@Override
public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
return new Person(rs.getInt("id")
,rs.getString("name")
,rs.getString("per_desc")
,rs.getDate("create_time")
,rs.getDate("update_time")
,rs.getString("sex")
,rs.getFloat("score")
,rs.getDouble("price"));
}
}
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class ProcessorFileApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessorFileApplication.class, args);
}
}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.mysqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
public class ProcessorFileOutputFromDBConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private ItemProcessor<Person, Person> fristNameUpperCaseProcessor;
@Autowired
private ItemProcessor<Person, Person> idFilterProcessor;
@Bean
public Job ProcessorFileOutputFromDBJob() {
return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")
.start(ProcessorFileOutputFromDBStep())
.build();
}
@Bean
public Step ProcessorFileOutputFromDBStep() {
return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")
.<Person, Person>chunk(100)
.reader(ProcessorFileOutputFromItemWriter())
.processor(personDataProcessor())
.writer(ProcessorFileOutputFromItemReader())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Person> ProcessorFileOutputFromItemWriter() {
JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource); // 设置数据源
reader.setFetchSize(100); // 设置一次最大读取条数
reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列
queryProvider.setFromClause("from person_buf"); // 设置要查询的表
Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
sortKeys.put("id", Order.ASCENDING);// 按照升序排序
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);// 设置排序列
return reader;
}
@Bean
public CompositeItemProcessor<Person, Person> personDataProcessor(){
CompositeItemProcessor<Person, Person> processor=new CompositeItemProcessor<>();
List<ItemProcessor<Person, Person>> listProcessor=new ArrayList<>();
listProcessor.add(fristNameUpperCaseProcessor);
listProcessor.add(idFilterProcessor);
processor.setDelegates(listProcessor);
return processor;
}
@Bean
@StepScope
public FlatFileItemWriter<Person> ProcessorFileOutputFromItemReader() {
FlatFileItemWriter<Person> writer = new FlatFileItemWriter<Person>();
try {
File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();
System.out.println("file is create in :" + path);
writer.setResource(new FileSystemResource(path));
writer.setLineAggregator(new PersonLineAggregator());
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class FristNameUpperCaseProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws Exception {
return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),
item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());
}
}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class IdFilterProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws Exception {
if (item.getId() % 2 == 0) {
return item;
} else {
return null;
}
}
}
运行结果:
观察写入完成后的文件:
可以看出我们已经完成了我们的目标
以上是关于spring-batch (ItemProcessor) 数据处理过程的主要内容,如果未能解决你的问题,请参考以下文章
Spring-batch:如何在 Spring Batch 中使用 skip 方法捕获异常消息?
Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理
如何确保 SFTP 会话始终在 spring-batch 结束时关闭
spring-batch (java-config) 使用 JobExecutionDecider 识别和执行步骤