MultiResourceItemReader 未按预期工作
Posted
技术标签:
【中文标题】MultiResourceItemReader 未按预期工作【英文标题】:MultiResourceItemReader not working as expected 【发布时间】:2021-09-08 12:22:54 【问题描述】:我一直在使用FlatFileItemReader
并一一处理文件。但我尝试使用MultiResourceItemReader
并一次提供所有文件,过滤后有3 个CSV
文件,最多50 个。
在运行Job
时,即使提供了所有文件,如果我验证结果,也只会处理 1 个文件。从CSV
文件中读取数据并保存到数据库中,验证结果后仅将1 个文件的数据保存到数据库中。我找不到我做错了什么。我的MultiResourceItemReader
代码如下:
@Bean(name = "multiItemReader")
@StepScope
public MultiResourceItemReader<CDSBrokerBOIDMappingEntity> multiResourceItemReader(@Value("#jobParameters[filenameStartPattern]") String filenameStartPattern
, @Value("#jobParameters[filenameEndPattern]") String filenameEndPattern, @Value("#jobParameters[localDirectory]") String localDirectory) throws Exception
String[] localDirectories = localDirectory.split(",");
List<Resource> inputResources = Collections.synchronizedList(new ArrayList<>());
for (String localDirectory1 : localDirectories)
try (Stream<Path> walk = Files.walk(Paths.get(localDirectory1), 1))
walk.filter(Files::isRegularFile) // is a file
.filter(p -> p.getFileName().toString().startsWith(filenameStartPattern) && p.getFileName().toString().endsWith(filenameEndPattern))
.findAny().ifPresentOrElse(f ->
log.info("CSV FILE => " + f.getFileName().toString());
inputResources.add(new FileSystemResource(f));
,
() ->
log.info("No file found");
);
catch (IOException e)
e.printStackTrace();
log.info("No. of files => "+inputResources.size());
MultiResourceItemReader<CDSBrokerBOIDMappingEntity> resourceItemReader = new MultiResourceItemReader<CDSBrokerBOIDMappingEntity>();
resourceItemReader.setResources(inputResources.toArray(Resource[]::new));
resourceItemReader.setDelegate(importReader());
resourceItemReader.setStrict(true);
return resourceItemReader;
而FlatFileItemReader
代码是:
@Bean
public FlatFileItemReader<CDSBrokerBOIDMappingEntity> importReader() throws Exception
FlatFileItemReader<CDSBrokerBOIDMappingEntity> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper<CDSBrokerBOIDMappingEntity>()
setLineTokenizer(new DelimitedLineTokenizer()
setNames(new String[]"BOID", "CLIENT_MEMBER_CODE", "BROKER_ID", "LAST_MODIFIED_DATE", "ISVALID");
);
setFieldSetMapper(new BeanWrapperFieldSetMapper<CDSBrokerBOIDMappingEntity>()
setTargetType(CDSBrokerBOIDMappingEntity.class);
);
);
reader.setStrict(true);
reader.afterPropertiesSet();
return reader;
作者是:
@Bean
public ItemWriter<CDSBrokerBOIDMappingEntity> writer()
// log.info("Writer current thread. ", Thread.currentThread().getName());
RepositoryItemWriter<CDSBrokerBOIDMappingEntity> writer = new RepositoryItemWriter<CDSBrokerBOIDMappingEntity>();
writer.setRepository(cdsBrokerBOIDMappingRepository);
// writer.setMethodName("save");
try
writer.afterPropertiesSet();
catch (Exception e)
e.printStackTrace();
return writer;
还有Step
和Job
:
@Bean
public Job importUserJob(MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader, JobCompletionNotificationListener listener)
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1(importReader))
.end()
.build();
@Bean
public Step step1(@Qualifier("multiItemReader") MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader)
return stepBuilderFactory.get("step1").<CDSBrokerBOIDMappingEntity, CDSBrokerBOIDMappingEntity>chunk(200)
.reader(importReader)
.processor(processor())
.writer(writer())
.listener(stepListener())
.taskExecutor(taskExecutor())
.build();
我尝试了多次,但只读取了 1 个文件。 代码有什么问题吗?还是我的方法不对?
【问题讨论】:
为什么会有反应性的东西? IT 不会增加任何复杂性,我怀疑这是您的实际问题(因为它会在列表中添加 1 或列表仍在添加中时停止或...)。 我确实调试了代码,反应式代码运行良好,所有三个文件都添加到列表中,然后添加到资源中。而且我也尝试过简单的for循环,结果是一样的。 如果您确定 MultiResourceItemReader 接收多个资源但只读取一个资源,那么请提供重现问题的minimal example 以便能够为您提供帮助。 @MahmoudBenHassine 我已经用普通的 for 循环替换了响应式代码。我认为这将是最小的例子。在使用 MultiResourceItemReader 之前,我使用的是 FlatFileItemReader,它在文件过滤器循环中调用 Job 时效果很好。MultiResourceItemReader
在收到多个资源时按预期工作,我提供了一个完整示例的答案。
【参考方案1】:
MultiResourceItemReader
在 Spring Batch v4.3.3 中按预期工作,这是一个简单的示例:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Configuration
@EnableBatchProcessing
public class SO68125366
@Bean
public MultiResourceItemReader<String> itemReader()
FlatFileItemReader<String> itemReader = new FlatFileItemReaderBuilder<String>()
.name("itemReader")
.lineMapper(new PassThroughLineMapper())
.build();
MultiResourceItemReader<String> multiResourceItemReader = new MultiResourceItemReader<>();
multiResourceItemReader.setDelegate(itemReader);
Resource resource1 = new FileSystemResource("file1.txt");
Resource resource2 = new FileSystemResource("file2.txt");
multiResourceItemReader.setResources(new Resource[] resource1, resource2);
return multiResourceItemReader;
@Bean
public ItemWriter<String> itemWriter()
return items -> items.forEach(System.out::println);
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps)
return jobs.get("job")
.start(steps.get("step")
.<String, String>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
public static void main(String[] args) throws Exception
ApplicationContext context = new AnnotationConfigApplicationContext(SO68125366.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
有两个文件 file1.txt
和 file2.txt
分别包含 hello
和 world
,示例打印:
hello
world
这意味着MultiResourceItemReader
读取了两种资源,而不仅仅是您提到的一种。完整的示例可以在此repo 中找到。
【讨论】:
以上是关于MultiResourceItemReader 未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章
如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程
如何将文件名输入到 Spring Batch 的 Item Reader 或 Item Processor 中?