MultiResourceItemReader 未按预期工作

Posted

技术标签:

【中文标题】MultiResourceItemReader 未按预期工作【英文标题】:MultiResourceItemReader not working as expected 【发布时间】:2021-09-08 12:22:54 【问题描述】:

我一直在使用FlatFileItemReader 并一一处理文件。但我尝试使用MultiResourceItemReader 并一次提供所有文件,过滤后有3 个CSV 文件,最多50 个。 在运行Job 时,即使提供了所有文件,如果我验证结果,也只会处理 1 个文件。从CSV 文件中读取数据并保存到数据库中,验证结果后仅将1 个文件的数据保存到数据库中。我找不到我做错了什么。我的MultiResourceItemReader 代码如下:

@Bean(name = "multiItemReader")
@StepScope
public MultiResourceItemReader<CDSBrokerBOIDMappingEntity> multiResourceItemReader(@Value("#jobParameters[filenameStartPattern]") String filenameStartPattern
        , @Value("#jobParameters[filenameEndPattern]") String filenameEndPattern, @Value("#jobParameters[localDirectory]") String localDirectory) throws Exception 

    String[] localDirectories = localDirectory.split(",");

    List<Resource> inputResources = Collections.synchronizedList(new ArrayList<>());

    for (String localDirectory1 : localDirectories)

        try (Stream<Path> walk = Files.walk(Paths.get(localDirectory1), 1)) 
            walk.filter(Files::isRegularFile)   // is a file
                    .filter(p -> p.getFileName().toString().startsWith(filenameStartPattern) && p.getFileName().toString().endsWith(filenameEndPattern))
                    .findAny().ifPresentOrElse(f -> 
                        log.info("CSV FILE => " + f.getFileName().toString());
                        inputResources.add(new FileSystemResource(f));
                    ,
                    () -> 
                        log.info("No file found");
                    );
         catch (IOException e) 
            e.printStackTrace();
        

    

    log.info("No. of files => "+inputResources.size());


    MultiResourceItemReader<CDSBrokerBOIDMappingEntity> resourceItemReader = new MultiResourceItemReader<CDSBrokerBOIDMappingEntity>();
    resourceItemReader.setResources(inputResources.toArray(Resource[]::new));
    resourceItemReader.setDelegate(importReader());
    resourceItemReader.setStrict(true);
    return resourceItemReader;

FlatFileItemReader 代码是:

@Bean
public FlatFileItemReader<CDSBrokerBOIDMappingEntity> importReader() throws Exception        

    FlatFileItemReader<CDSBrokerBOIDMappingEntity> reader = new FlatFileItemReader<>();

    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper<CDSBrokerBOIDMappingEntity>() 
        setLineTokenizer(new DelimitedLineTokenizer() 
            setNames(new String[]"BOID", "CLIENT_MEMBER_CODE", "BROKER_ID", "LAST_MODIFIED_DATE", "ISVALID");
        );
        setFieldSetMapper(new BeanWrapperFieldSetMapper<CDSBrokerBOIDMappingEntity>() 
            setTargetType(CDSBrokerBOIDMappingEntity.class);
        );
    );

    reader.setStrict(true);
    reader.afterPropertiesSet();

    return reader;

作者是:

@Bean
public ItemWriter<CDSBrokerBOIDMappingEntity> writer() 

    //  log.info("Writer current thread. ", Thread.currentThread().getName());

    RepositoryItemWriter<CDSBrokerBOIDMappingEntity> writer = new RepositoryItemWriter<CDSBrokerBOIDMappingEntity>();

    writer.setRepository(cdsBrokerBOIDMappingRepository);
//    writer.setMethodName("save");

    try 
        writer.afterPropertiesSet();
     catch (Exception e) 
        e.printStackTrace();
    

    return writer;

还有StepJob

@Bean
public Job importUserJob(MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader, JobCompletionNotificationListener listener) 
    return jobBuilderFactory
            .get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1(importReader))
            .end()
            .build();


@Bean
public Step step1(@Qualifier("multiItemReader") MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader) 
    return stepBuilderFactory.get("step1").<CDSBrokerBOIDMappingEntity, CDSBrokerBOIDMappingEntity>chunk(200)
            .reader(importReader)
            .processor(processor())
            .writer(writer())
            .listener(stepListener())
            .taskExecutor(taskExecutor())
            .build();

我尝试了多次,但只读取了 1 个文件。 代码有什么问题吗?还是我的方法不对?

【问题讨论】:

为什么会有反应性的东西? IT 不会增加任何复杂性,我怀疑这是您的实际问题(因为它会在列表中添加 1 或列表仍在添加中时停止或...)。 我确实调试了代码,反应式代码运行良好,所有三个文件都添加到列表中,然后添加到资源中。而且我也尝试过简单的for循环,结果是一样的。 如果您确定 MultiResourceItemReader 接收多个资源但只读取一个资源,那么请提供重现问题的minimal example 以便能够为您提供帮助。 @MahmoudBenHassine 我已经用普通的 for 循环替换了响应式代码。我认为这将是最小的例子。在使用 MultiResourceItemReader 之前,我使用的是 FlatFileItemReader,它在文件过滤器循环中调用 Job 时效果很好。 MultiResourceItemReader 在收到多个资源时按预期工作,我提供了一个完整示例的答案。 【参考方案1】:

MultiResourceItemReader 在 Spring Batch v4.3.3 中按预期工作,这是一个简单的示例:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Configuration
@EnableBatchProcessing
public class SO68125366 

    @Bean
    public MultiResourceItemReader<String> itemReader() 
        FlatFileItemReader<String> itemReader = new FlatFileItemReaderBuilder<String>()
                .name("itemReader")
                .lineMapper(new PassThroughLineMapper())
                .build();

        MultiResourceItemReader<String> multiResourceItemReader = new MultiResourceItemReader<>();
        multiResourceItemReader.setDelegate(itemReader);
        Resource resource1 = new FileSystemResource("file1.txt");
        Resource resource2 = new FileSystemResource("file2.txt");
        multiResourceItemReader.setResources(new Resource[] resource1, resource2);
        return multiResourceItemReader;
    

    @Bean
    public ItemWriter<String> itemWriter() 
        return items -> items.forEach(System.out::println);
    

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) 
        return jobs.get("job")
                .start(steps.get("step")
                        .<String, String>chunk(5)
                        .reader(itemReader())
                        .writer(itemWriter())
                        .build())
                .build();
    

    public static void main(String[] args) throws Exception 
        ApplicationContext context = new AnnotationConfigApplicationContext(SO68125366.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    


有两个文件 file1.txtfile2.txt 分别包含 helloworld,示例打印:

hello
world

这意味着MultiResourceItemReader 读取了两种资源,而不仅仅是您提到的一种。完整的示例可以在此repo 中找到。

【讨论】:

以上是关于MultiResourceItemReader 未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章

如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程

如何将文件名输入到 Spring Batch 的 Item Reader 或 Item Processor 中?

常见未授权访问漏洞总结

未找到配置文件:未找到未过期的配置文件

“注意:未定义的变量”、“注意:未定义的索引”、“警告:未定义的数组键”和“注意:未定义的偏移量”使用 PHP

“注意:未定义的变量”、“注意:未定义的索引”、“警告:未定义的数组键”和“注意:未定义的偏移量”使用 PHP