如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程
Posted
技术标签:
【中文标题】如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程【英文标题】:How to specify the taskexecutor to spawn thread after reading a single csv by using MultiResourceItemReader in spring batch 【发布时间】:2020-11-20 03:32:43 【问题描述】:尝试使用 MultiResourceItemReader 在 spring 批处理中读取多个文件,并且还有 taskExecutor 用于每个文件中的记录以在多线程中读取。假设一个文件夹中有 3 个 csv 文件, MultiResourceItemReader 应该一个一个地执行它,但是因为我有 taskExecutor ,不同的线程占用了 csv 文件,就像两个 csv 文件被同一文件夹中的线程占用并开始执行。
期望:- MultiResourceItemReader 应该读取第一个文件,然后 taskExecutor 应该产生不同的线程并执行。然后应该选择另一个文件,并由 taskExecutor 执行。
代码片段/Batch_Configuration :-
@豆角,扁豆
公共步骤 Step1()
返回 stepBuilderFactory.get("Step1")
.
@Bean
public MultiResourceItemReader<POJO> multiResourceItemReader()
MultiResourceItemReader<POJO> resourceItemReader = new MultiResourceItemReader<POJO>();
ClassLoader cl = this.getClass().getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
Resource[] resources;
try
resources = resolver.getResources("file:/temp/*.csv");
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(itemReader());
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
return resourceItemReader;
【问题讨论】:
【参考方案1】:也许您应该检查 Partitioner (https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning) 并实现如下内容:
@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
FlatFileItemReader itemReader,
ListDelegateWriter listDelegateWriter,
BatchProperties batchProperties)
return stepBuilderFactory.get(Steps.MAIN)
.<POJO, POJO>chunk(pageSize)
.reader(unmatchedItemReader)
.writer(listDelegateWriter)
.build();
@Bean
public TaskExecutor jobTaskExecutor(@Value("$batch.config.core-pool-size") Integer corePoolSize,
@Value("$batch.config.max-pool-size") Integer maxPoolSize)
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.afterPropertiesSet();
return taskExecutor;
@Bean
@StepScope
public Partitioner partitioner(@Value("#jobExecutionContext[ResourcesToRead]") String[] resourcePaths,
@Value("$batch.config.grid-size") Integer gridSize)
Resource[] resourceList = Arrays.stream(resourcePaths)
.map(FileSystemResource::new)
.toArray(Resource[]::new);
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resourceList);
partitioner.partition(gridSize);
return partitioner;
@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner, Step mainStep, TaskExecutor jobTaskExecutor)
return stepBuilderFactory.get(BatchConstants.MASTER)
.partitioner(mainStep)
.partitioner(Steps.MAIN, partitioner)
.taskExecutor(jobTaskExecutor)
.build();
【讨论】:
以上是关于如何在春季批处理中使用 MultiResourceItemReader 在读取单个 csv 后指定任务执行器以生成线程的主要内容,如果未能解决你的问题,请参考以下文章