Spring批处理在拒绝文件中保存跳过的读取器行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring批处理在拒绝文件中保存跳过的读取器行相关的知识,希望对你有一定的参考价值。

我想在输出拒绝文件中重写跳过的读取器行,谢谢。

我的代码:

public class JobPerson {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;


    @Bean
    public ItemReader<Person> itemReader() {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("user.csv"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer(";") {{
                setNames(new String[] {"firstName", "lastName", "age" });
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }



    @Bean
    public ItemWriter<Person> itemWriter() {
        return items -> {
            int i=0;
            for (Person item : items) {
                i++;

                System.out.println(i+". Nom = " + item.getFirstName()+". Prenom = " + item.getLastName());
            }
        };
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return item -> {
            String stritem=item.toString();
            String[] splitArray = stritem.split(";"); // tableau de chaînes
            int lineData = splitArray.length;
            //if (lineData<2)

            if (item.equals("Eric")) {
                throw new IllegalArgumentException("Wanted!");
            }
            return item;
        };
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Person, Person>chunk(5)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .faultTolerant()
                .skip(IllegalArgumentException.class)
                .skipLimit(100)
                .listener(new MySkipListener())
                .skip(Exception.class)
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    public static class MySkipListener implements SkipListener<Person, Person> {

        @Override
        public void onSkipInRead(Throwable t) {
            System.err.println("On Skip in Read Error : " + t.getMessage());
        }

        @Override
        public void onSkipInWrite(Person item, Throwable t) {
            System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
        }

        @Override
        public void onSkipInProcess(Person item, Throwable t) {
            System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
        }
    }
}

要重写的行(红色的行):

lines writing with red

=> On Skip in Read Error:解析错误在第2行:资源= [类路径资源[user.csv]],输入= [Eric; Bonneton;] On Skip in Read Error:解析错误在第3行:资源= [class path resource [user.csv]],input = [sdqsdqs;]

答案

在您的示例中,错误在读取期间发生,具有FlatFileParseException异常。此例外为您提供了跳过的行号和原始输入行。

所以你的跳过听众可以是这样的:

public static class MySkipListener implements SkipListener<Person, Person> {

    private FileWriter fileWriter;

    public MySkipListener(File file) throws IOException {
        this.fileWriter = new FileWriter(file);
    }

    @Override
    public void onSkipInRead(Throwable throwable) {
        if (throwable instanceof FlatFileParseException) {
            FlatFileParseException flatFileParseException = (FlatFileParseException) throwable;
            try {
                fileWriter.write(flatFileParseException.getInput());
            } catch (IOException e) {
                System.err.println("Unable to write skipped line to error file");
            }
        }
    }

    @Override
    public void onSkipInWrite(Person item, Throwable t) {
        System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(Person item, Throwable t) {
        System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
    }
}

然后,您可以在步骤中使用要在构造时写入跳过的行的文件配置侦听器。您还需要将FlatFileParseException声明为可跳过的异常。

希望这可以帮助。

另一答案

我添加了您的建议,但这些行未添加到输出文件中。我的新代码:

public class MyJob {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

File fo=new File("C:Usersm.younebDocumentsicdccecWorkplacesaveLinessrcmain
esourcescsvoutput.csv");
@Bean
public ItemReader<Person> itemReader() {
    FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
    reader.setResource(new ClassPathResource("user.csv"));
    reader.setLineMapper(new DefaultLineMapper<Person>() {{
        setLineTokenizer(new DelimitedLineTokenizer(";") {{
            setNames(new String[] {"firstName", "lastName", "age" });
        }});
        setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
            setTargetType(Person.class);
        }});
    }});
    return reader;
}



@Bean
public ItemWriter<Person> itemWriter() {
    return items -> {
        int i=0;
        for (Person item : items) {
            i++;

            System.out.println(i+". Nom = " + item.getFirstName()+". Prenom = " + item.getLastName());
        }
    };
}

@Bean
public ItemProcessor<Person, Person> itemProcessor() {
    return item -> {
        String stritem=item.toString();
        String[] splitArray = stritem.split(";"); // tableau de chaînes
        int lineData = splitArray.length;
        //if (lineData<2)

        if (item.equals("Eric")) {
            throw new IllegalArgumentException("Wanted!");
        }
        return item;
    };
}

@Bean
public Step step() throws IOException {
    return steps.get("step")
            .<Person, Person>chunk(5)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .faultTolerant()
            .skip(IllegalArgumentException.class)
            .skip(FlatFileParseException.class)
            .skipLimit(100)
            .listener(new MySkipListener(fo))
            .skip(Exception.class)
            .build();
}

@Bean
public Job job() throws IOException {
    return jobs.get("job")
            .start(step())
            .build();
}

public static class MySkipListener implements SkipListener<Person, Person> {

    private FileWriter fileWriter;

    public MySkipListener(File file) throws IOException {
        this.fileWriter = new FileWriter(file);
    }

    @Override
    public void onSkipInRead(Throwable throwable) {
        if (throwable instanceof FlatFileParseException) {
            FlatFileParseException flatFileParseException = (FlatFileParseException) throwable;
            try {
                fileWriter.write(flatFileParseException.getInput());
            } catch (IOException e) {
                System.err.println("Unable to write skipped line to error file");
            }
        }
    }

    @Override
    public void onSkipInWrite(Person item, Throwable t) {
        System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(Person item, Throwable t) {
        System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
    }
}

}

以上是关于Spring批处理在拒绝文件中保存跳过的读取器行的主要内容,如果未能解决你的问题,请参考以下文章

python读取csv文件跳过前几行

Python Pandas,读取文件并在标题前跳过行

Spring Batch中如何读取多个CSV文件合并数据进行处理?

从文件中读取所有数据

如何在 C++ 中跳过空格

如果从计划的作业中调用,Spring Boot 存储库不会保存到数据库