Spring批处理在拒绝文件中保存跳过的读取器行
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring批处理在拒绝文件中保存跳过的读取器行相关的知识,希望对你有一定的参考价值。
我想在输出拒绝文件中重写跳过的读取器行,谢谢。
我的代码:
public class JobPerson {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Person> itemReader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("user.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer(";") {{
setNames(new String[] {"firstName", "lastName", "age" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
int i=0;
for (Person item : items) {
i++;
System.out.println(i+". Nom = " + item.getFirstName()+". Prenom = " + item.getLastName());
}
};
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> {
String stritem=item.toString();
String[] splitArray = stritem.split(";"); // tableau de chaînes
int lineData = splitArray.length;
//if (lineData<2)
if (item.equals("Eric")) {
throw new IllegalArgumentException("Wanted!");
}
return item;
};
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(100)
.listener(new MySkipListener())
.skip(Exception.class)
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
public static class MySkipListener implements SkipListener<Person, Person> {
@Override
public void onSkipInRead(Throwable t) {
System.err.println("On Skip in Read Error : " + t.getMessage());
}
@Override
public void onSkipInWrite(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
@Override
public void onSkipInProcess(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
}
}
要重写的行(红色的行):
=> On Skip in Read Error:解析错误在第2行:资源= [类路径资源[user.csv]],输入= [Eric; Bonneton;] On Skip in Read Error:解析错误在第3行:资源= [class path resource [user.csv]],input = [sdqsdqs;]
答案
在您的示例中,错误在读取期间发生,具有FlatFileParseException
异常。此例外为您提供了跳过的行号和原始输入行。
所以你的跳过听众可以是这样的:
public static class MySkipListener implements SkipListener<Person, Person> {
private FileWriter fileWriter;
public MySkipListener(File file) throws IOException {
this.fileWriter = new FileWriter(file);
}
@Override
public void onSkipInRead(Throwable throwable) {
if (throwable instanceof FlatFileParseException) {
FlatFileParseException flatFileParseException = (FlatFileParseException) throwable;
try {
fileWriter.write(flatFileParseException.getInput());
} catch (IOException e) {
System.err.println("Unable to write skipped line to error file");
}
}
}
@Override
public void onSkipInWrite(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
@Override
public void onSkipInProcess(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
}
然后,您可以在步骤中使用要在构造时写入跳过的行的文件配置侦听器。您还需要将FlatFileParseException
声明为可跳过的异常。
希望这可以帮助。
另一答案
我添加了您的建议,但这些行未添加到输出文件中。我的新代码:
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
File fo=new File("C:Usersm.younebDocumentsicdccecWorkplacesaveLinessrcmain
esourcescsvoutput.csv");
@Bean
public ItemReader<Person> itemReader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("user.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer(";") {{
setNames(new String[] {"firstName", "lastName", "age" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
int i=0;
for (Person item : items) {
i++;
System.out.println(i+". Nom = " + item.getFirstName()+". Prenom = " + item.getLastName());
}
};
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> {
String stritem=item.toString();
String[] splitArray = stritem.split(";"); // tableau de chaînes
int lineData = splitArray.length;
//if (lineData<2)
if (item.equals("Eric")) {
throw new IllegalArgumentException("Wanted!");
}
return item;
};
}
@Bean
public Step step() throws IOException {
return steps.get("step")
.<Person, Person>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.faultTolerant()
.skip(IllegalArgumentException.class)
.skip(FlatFileParseException.class)
.skipLimit(100)
.listener(new MySkipListener(fo))
.skip(Exception.class)
.build();
}
@Bean
public Job job() throws IOException {
return jobs.get("job")
.start(step())
.build();
}
public static class MySkipListener implements SkipListener<Person, Person> {
private FileWriter fileWriter;
public MySkipListener(File file) throws IOException {
this.fileWriter = new FileWriter(file);
}
@Override
public void onSkipInRead(Throwable throwable) {
if (throwable instanceof FlatFileParseException) {
FlatFileParseException flatFileParseException = (FlatFileParseException) throwable;
try {
fileWriter.write(flatFileParseException.getInput());
} catch (IOException e) {
System.err.println("Unable to write skipped line to error file");
}
}
}
@Override
public void onSkipInWrite(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
@Override
public void onSkipInProcess(Person item, Throwable t) {
System.out.println("Item " + item + " was skipped due to: " + t.getMessage());
}
}
}
以上是关于Spring批处理在拒绝文件中保存跳过的读取器行的主要内容,如果未能解决你的问题,请参考以下文章