在阅读器中为 Spring 批处理作业实现 Keyset 分页

Posted

技术标签:

【中文标题】在阅读器中为 Spring 批处理作业实现 Keyset 分页【英文标题】:Implement Keyset paging in a reader for a spring batch job 【发布时间】:2021-05-24 13:26:56 【问题描述】:

我有一个春季批处理作业,它从 Postgres 数据库中获取数据,并在处理后将其写入 Excel 工作表。但我想在阅读器中为春季批处理作业实现键集分页。目前,我正在使用使用限制偏移分页的 JpaPagingItemReader,但是因为我正在处理大量数据,所以 JpaPagingItemReader 用于获取数据的查询随着偏移量的增加而变得低效。键集分页可用于避免限制偏移分页的限制,但我不知道如何使用键集分页实现阅读器。如何实现?

编辑: 键集分页不包括记录的偏移/跳过,相反,我们将在结果中排​​序和跟踪数字唯一标识符,并请求大于最后一个唯一条目的条目。在这种方法中,SQL 将如下所示(假设 customer_id 是记录的唯一自动生成标识符)

select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 0 order by customer_id asc limit 100;

-- Second iteration ( size = 100, lastCustomerId = 100 ) 
select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 100 order by customer_id asc limit 100;

-- Second iteration ( size = 100, lastCustomerId = 200 ) 
select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 200 order by customer_id asc limit 100;

在实现键集分页时需要牢记的几点是:

    每条记录都应该有一个数字唯一标识符( 最好是主键) 结果集应该是有序的 我们应该有逻辑来排序并在检索到的列表中找到最大的 id。 您正在使用的标识符字段上应该有一个索引 偷看。
public class CustomerProcessorService 
    public void processCustomers() 
        List<Customer> customers = new ArrayList();
        long lastCusId = 0;
        int size = 100;
        while ( true ) 
            // Create a PageRequest object that will be passed as Pageable interface to repo
            // Note that here we are setting 0 as the offset
            PageRequest pageRequest = new PageRequest(0,size);
            // Get the lastCusId 
            lastCusId = getLastCusId(customers);
            // Get the data from the database
            customers = customerRepository.findByStatusAndCustomerIdGreaterThanOrderByCustomerIdAsc('ACTIVE',lastCusId,pageRequest);
            // Check if data is there
            if ( customers == null || customers.isEmpty()) 
                break;
            
            // Do the processing
         
    
    public Long getLastCusId(List<Customer> customers) 
        // If passed entry is null or empty, return 0 ( This handles the first iteration case ) 
        if ( customers == null || customers.isEmpty()) 
            return 0l;
        
        // Do the logic to sort the customers list by customer_id of each
        // Customer object
        // Return the last entry 
        return customers.get(customers.size() -1).getCustomerId();
    

【问题讨论】:

您能解释一下什么是键集分页吗? 没有 Spring Batch,你会怎么做呢?如果你分享你的代码,我可以帮你创建一个 Spring Batch 阅读器。 @MahmoudBenHassine 我已经添加了一个简短的解释以及实现键集分页的虚拟代码。我们如何在 spring 批处理阅读器中使用它? 好的,感谢您的更新。我添加了一个您需要编译/调整的示例的答案。希望对您有所帮助。 @MahmoudBenHassine 感谢您的回答。我还有一个疑问,如果它是 UUID,lastCusId 的初始值应该是什么 如果您的 customerId 是 UUID,则说明中的第 1 点和第 2 点不再存在,并且这种“键集分页”技术将不再起作用。您需要找到另一个标准来对数据进行排序/分页。 【参考方案1】:

您应该能够通过扩展AbstractPaginatedDataItemReader 类来实现分页逻辑。该基类处理大部分分页样板文件,并允许您在doPageRead 中指定分页逻辑。这是一个简单的例子,我会让你相应地调整它:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.domain.PageRequest;

public class KeySetPagingItemReader extends AbstractPaginatedDataItemReader<Customer> 

    long lastCusId = 0;
    int size = 100;
    private CustomerRepository customerRepository;
    List<Customer> customers = new ArrayList();

    public KeySetPagingItemReader(CustomerRepository customerRepository) 
        this.customerRepository = customerRepository;
    

    @Override
    protected Iterator<Customer> doPageRead() 
        PageRequest pageRequest = PageRequest.of(0,size);
        // Get the lastCusId
        lastCusId = getLastCusId(customers);
        // Get the data from the database
        customers = customerRepository.findByStatusAndCustomerIdGreaterThanOrderByCustomerIdAsc('ACTIVE',lastCusId,pageRequest);
        return customers.iterator();
    

    public Long getLastCusId(List<Customer> customers) 
        // If passed entry is null or empty, return 0 ( This handles the first iteration case )
        if ( customers == null || customers.isEmpty())
            return 0l;

        // Do the logic to sort the customers list by customer_id of each
        // Customer object
        // Return the last entry
        return customers.get(customers.size() -1).getCustomerId();
    

【讨论】:

我们也可以在这里使用 AbstractPagingItemReader 吗? AbstractPaginatedDataItemReader 和 AbstractPagingItemReader 有什么区别? 是的,你可以。这些确实是相似的,但是它们在处理分页的方式上有细微的差别。 AbstractPaginatedDataItemReader 是按照 Spring Data 处理分页的方式设计的。

以上是关于在阅读器中为 Spring 批处理作业实现 Keyset 分页的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Cloud Data Flow 中为 Spring Batch 作业设置调度程序?

Spring Batch Kafka Kafka 到数据库作业

Spring batch - 项目阅读器知道阅读器处理的最后一条记录

查询以包括 spring 批处理作业参数值以及作业执行数据

Spring批量分页阅读器和异常处理

如何在 Spring 中为计划进程验证系统用户?