为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?

Posted

技术标签:

【中文标题】为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?【英文标题】:Why is my Spring Batch multi-threaded step executing all reads before any processing?为什么我的 Spring Batch 多线程步骤在任何处理之前执行所有读取? 【发布时间】:2020-09-17 09:06:58 【问题描述】:

我正在尝试编写一个 Spring Batch 流程,用于将具有庞大架构的遗留数据库中的数百万个条目转换为简化的 JSON 格式,并将该 JSON 发布到 GCP PubSub。为了使这个过程尽可能高效,我正在尝试利用 Spring-Batch 多线程 Step。

为了测试我的进程,我从小处着手,页面大小和块大小为 5,总共处理 20 个条目的限制,以及只有 1 个线程的线程池。我正在尝试逐步完成该过程以验证它是否按预期工作 - 但事实并非如此。

我预计将我的 RepositoryItemReader 配置为页面大小为 5 会导致它从数据库中仅读取 5 条记录 - 然后在读取下一个 5 之前以 5 的单个块处理这些记录。但这不是正在发生的事情.相反,在日志中,由于我启用了 hibernate show-sql,我可以看到阅读器在任何处理开始之前读取了所有 20 条记录。

为什么我的多线程步骤在执行任何处理之前执行所有读取?我是不是配置错了?显然,我不希望我的 Job 在开始处理任何内容之前尝试将数百万个 DTO 加载到内存中......

这是我的工作配置方式:

@Configuration
public class ConversionBatchJobConfig 

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("$commit.chunk.size:5") Integer chunkSize) 
        return new SimpleCompletionPolicy(chunkSize);
    

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#jobParameters[pageSize]") Integer pageSize, //pageSize and chunkSize both 5 for now
            @Value("#jobParameters[limit]") Integer limit)  //limit is 40
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() 
            add("ACTIVE");
        );
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() 
            put("update_date", Sort.Direction.ASC);
        );
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        myDomainRepositoryReader.setSaveState(false);
        return myDomainRepositoryReader;
    

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) 
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) 
        return new JsonMessageWriter(publisherService);
    

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  ItemProcessor<DbProjection, JsonMessage> dataConverter,
                                  ItemWriter<JsonMessage> jsonPublisher,
                                  StepBuilderFactory stepBuilderFactory,
                                  TaskExecutor conversionThreadPool,
                                  @Value("$conversion.failure.limit:20") int maximumFailures) 
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, JsonMessage>chunk(processChunkSize)
                .reader(dbReader)
                .processor(dataConverter)
                .writer(jsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new MyConversionSkipListener(processStatus))
                              //  ^ for now this just logs the error
                .taskExecutor(conversionThreadPool)
                .build();
    

    @Bean
    public Job conversionJob(Step conversionProcess,
                             JobBuilderFactory jobBuilderFactory) 
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    

【问题讨论】:

【参考方案1】:

您需要检查hibernate.jdbc.fetch_size 的值并进行相应设置。

pageSizefetchSize 是不同的参数。您可以在此处找到有关差异的更多详细信息:https://***.com/a/58058009/5019386。因此,在您的情况下,如果 fetchSize 大于 pageSize,则获取的记录可能多于页面大小。

【讨论】:

以上是关于为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch 事务管理 - 多线程步骤

Spring Batch 在流程中有两个步骤。为啥第二步永远不会运行,第一步是无限循环

为啥 Spring 批处理多线程步骤会破坏 ItemProcessor 中的其他存储库休眠查询?

Spring Batch 并行读取数据库

为啥 Spring Batch 为每个线程使用 1 个数据库连接?

Spring Batch 异步处理器配置以获得最佳性能