如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程

Posted

技术标签:

【中文标题】如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程【英文标题】:How to specify the taskexecutor to spawn thread after reading a single csv by using MultiResourceItemReader in spring batch 【发布时间】:2020-11-20 03:32:43 【问题描述】:

尝试使用 MultiResourceItemReader 在 spring 批处理中读取多个文件,并且还有 taskExecutor 用于每个文件中的记录以在多线程中读取。假设一个文件夹中有 3 个 csv 文件, MultiResourceItemReader 应该一个一个地执行它,但是因为我有 taskExecutor ,不同的线程占用了 csv 文件,就像两个 csv 文件被同一文件夹中的线程占用并开始执行。

期望:- MultiResourceItemReader 应该读取第一个文件,然后 taskExecutor 应该产生不同的线程并执行。然后应该选择另一个文件,并由 taskExecutor 执行。

代码片段/Batch_Configuration :- @豆角,扁豆 公共步骤 Step1() 返回 stepBuilderFactory.get("Step1") .块(5) .reader(multiResourceItemReader()) .writer(作家()) .taskExecutor(taskExecutor()).throttleLimit(throttleLimit) 。建造();

@Bean
public MultiResourceItemReader<POJO> multiResourceItemReader() 

    MultiResourceItemReader<POJO> resourceItemReader = new MultiResourceItemReader<POJO>();
    ClassLoader cl = this.getClass().getClassLoader();
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);

    Resource[] resources;
    try 
        resources = resolver.getResources("file:/temp/*.csv");
         resourceItemReader.setResources(resources);
            resourceItemReader.setDelegate(itemReader());
     catch (IOException e) 
        // TODO Auto-generated catch block
        e.printStackTrace();
    
    return resourceItemReader;

【问题讨论】:

【参考方案1】:

也许您应该检查 Partitioner (https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning) 并实现如下内容:

@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
                     FlatFileItemReader itemReader,
                     ListDelegateWriter listDelegateWriter,
                     BatchProperties batchProperties) 
    return stepBuilderFactory.get(Steps.MAIN)
            .<POJO, POJO>chunk(pageSize)
            .reader(unmatchedItemReader)
            .writer(listDelegateWriter)
            .build();


@Bean
public TaskExecutor jobTaskExecutor(@Value("$batch.config.core-pool-size") Integer corePoolSize,
                                    @Value("$batch.config.max-pool-size") Integer maxPoolSize) 
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(corePoolSize);
    taskExecutor.setMaxPoolSize(maxPoolSize);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;


@Bean
@StepScope
public Partitioner partitioner(@Value("#jobExecutionContext[ResourcesToRead]") String[] resourcePaths,
                               @Value("$batch.config.grid-size") Integer gridSize) 
    Resource[] resourceList = Arrays.stream(resourcePaths)
            .map(FileSystemResource::new)
            .toArray(Resource[]::new);
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setResources(resourceList);
    partitioner.partition(gridSize);
    return partitioner;


@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner, Step mainStep, TaskExecutor jobTaskExecutor) 
    return stepBuilderFactory.get(BatchConstants.MASTER)
            .partitioner(mainStep)
            .partitioner(Steps.MAIN, partitioner)
            .taskExecutor(jobTaskExecutor)
            .build();

【讨论】:

以上是关于如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程的主要内容,如果未能解决你的问题,请参考以下文章

春季启动批处理到具有多个作业的春季云任务

春季批处理作业未读取第一行

在春季批处理部署程序分区处理程序中定期获取正在运行的工作节点的状态

无法在春季批处理中序列化此事务的访问

春季批处理 - 传输文件

在春季批处理(spring-boot-1.5.2.RELEASE)中使用多个数据源在启动时引发异常