spring-batch (ItemProcessor) 数据处理过程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-batch (ItemProcessor) 数据处理过程相关的知识,希望对你有一定的参考价值。

Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
技术分享图片

代码:
Person

package com.dhcc.batch.batchDemo.processor;

import java.util.Date;

public class Person {
    private Integer id;
    private String name;
    private String perDesc;
    private Date createTime;
    private Date updateTime;
    private String sex;
    private Float score;
    private Double price;

    public Person() {
        super();
    }

    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
            Double price) {
        super();
        this.id = id;
        this.name = name;
        this.perDesc = perDesc;
        this.createTime = createTime;
        this.updateTime = updateTime;
        this.sex = sex;
        this.score = score;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public String getPerDesc() {
        return perDesc;
    }

    public void setPerDesc(String perDesc) {
        this.perDesc = perDesc;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Float getScore() {
        return score;
    }

    public void setScore(Float score) {
        this.score = score;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
    }

}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PersonLineAggregator implements LineAggregator<Person> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(Person person) {
        try {
            return mapper.writeValueAsString(person);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 实现将数据库中的每条数据映射到Person对象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

    /**
     * rs一条结果集,rowNum代表当前行
     */
    @Override
    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Person(rs.getInt("id")
                ,rs.getString("name")
                ,rs.getString("per_desc")
                ,rs.getDate("create_time")
                ,rs.getDate("update_time")
                ,rs.getString("sex")
                ,rs.getFloat("score")
                ,rs.getDouble("price"));
    }

}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class ProcessorFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProcessorFileApplication.class, args);

    }
}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.mysqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class ProcessorFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ItemProcessor<Person, Person> fristNameUpperCaseProcessor;

    @Autowired
    private ItemProcessor<Person, Person> idFilterProcessor;

    @Bean
    public Job ProcessorFileOutputFromDBJob() {
        return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")
                .start(ProcessorFileOutputFromDBStep())
                .build();

    }

    @Bean
    public Step ProcessorFileOutputFromDBStep() {
        return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")
                .<Person, Person>chunk(100)
                .reader(ProcessorFileOutputFromItemWriter())
                .processor(personDataProcessor())
                .writer(ProcessorFileOutputFromItemReader())
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Person> ProcessorFileOutputFromItemWriter() {
        JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 设置数据源
        reader.setFetchSize(100); // 设置一次最大读取条数
        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列
        queryProvider.setFromClause("from person_buf"); // 设置要查询的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
        sortKeys.put("id", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 设置排序列
        return reader;
    }

    @Bean
    public CompositeItemProcessor<Person, Person> personDataProcessor(){
        CompositeItemProcessor<Person, Person> processor=new CompositeItemProcessor<>();
        List<ItemProcessor<Person, Person>> listProcessor=new ArrayList<>();
        listProcessor.add(fristNameUpperCaseProcessor);
        listProcessor.add(idFilterProcessor);
        processor.setDelegates(listProcessor);
        return processor;

    }

    @Bean
    @StepScope
    public FlatFileItemWriter<Person> ProcessorFileOutputFromItemReader() {
        FlatFileItemWriter<Person> writer = new FlatFileItemWriter<Person>();
        try {
            File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();
            System.out.println("file is create in :" + path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new PersonLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;
    }

}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class FristNameUpperCaseProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),
                item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());
    }

}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class IdFilterProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        if (item.getId() % 2 == 0) {
            return item;
        } else {
            return null;
        }
    }

}

运行结果:
技术分享图片
观察写入完成后的文件:
技术分享图片
可以看出我们已经完成了我们的目标

以上是关于spring-batch (ItemProcessor) 数据处理过程的主要内容,如果未能解决你的问题,请参考以下文章

Spring-batch:如何在 Spring Batch 中使用 skip 方法捕获异常消息?

Spring-Batch学习总结——重要概念,环境搭建,名词解释,第一个项目及异常处理

如何确保 SFTP 会话始终在 spring-batch 结束时关闭

spring-batch (java-config) 使用 JobExecutionDecider 识别和执行步骤

Spring-batch - 比较来自 csv 的数据并相应地更新 Salesforce 中的记录

SPRING-BATCH 错误:没有可用于步骤范围的上下文持有者