在阅读器中为 Spring 批处理作业实现 Keyset 分页
Posted
技术标签:
【中文标题】在阅读器中为 Spring 批处理作业实现 Keyset 分页【英文标题】:Implement Keyset paging in a reader for a spring batch job 【发布时间】:2021-05-24 13:26:56 【问题描述】:我有一个春季批处理作业,它从 Postgres 数据库中获取数据,并在处理后将其写入 Excel 工作表。但我想在阅读器中为春季批处理作业实现键集分页。目前,我正在使用使用限制偏移分页的 JpaPagingItemReader,但是因为我正在处理大量数据,所以 JpaPagingItemReader 用于获取数据的查询随着偏移量的增加而变得低效。键集分页可用于避免限制偏移分页的限制,但我不知道如何使用键集分页实现阅读器。如何实现?
编辑: 键集分页不包括记录的偏移/跳过,相反,我们将在结果中排序和跟踪数字唯一标识符,并请求大于最后一个唯一条目的条目。在这种方法中,SQL 将如下所示(假设 customer_id 是记录的唯一自动生成标识符)
select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 0 order by customer_id asc limit 100;
-- Second iteration ( size = 100, lastCustomerId = 100 )
select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 100 order by customer_id asc limit 100;
-- Second iteration ( size = 100, lastCustomerId = 200 )
select * from CUSTOMERS where status = 'ACTIVE' and customer_id > 200 order by customer_id asc limit 100;
在实现键集分页时需要牢记的几点是:
-
每条记录都应该有一个数字唯一标识符(
最好是主键)
结果集应该是有序的
我们应该有逻辑来排序并在检索到的列表中找到最大的 id。
您正在使用的标识符字段上应该有一个索引
偷看。
public class CustomerProcessorService
public void processCustomers()
List<Customer> customers = new ArrayList();
long lastCusId = 0;
int size = 100;
while ( true )
// Create a PageRequest object that will be passed as Pageable interface to repo
// Note that here we are setting 0 as the offset
PageRequest pageRequest = new PageRequest(0,size);
// Get the lastCusId
lastCusId = getLastCusId(customers);
// Get the data from the database
customers = customerRepository.findByStatusAndCustomerIdGreaterThanOrderByCustomerIdAsc('ACTIVE',lastCusId,pageRequest);
// Check if data is there
if ( customers == null || customers.isEmpty())
break;
// Do the processing
public Long getLastCusId(List<Customer> customers)
// If passed entry is null or empty, return 0 ( This handles the first iteration case )
if ( customers == null || customers.isEmpty())
return 0l;
// Do the logic to sort the customers list by customer_id of each
// Customer object
// Return the last entry
return customers.get(customers.size() -1).getCustomerId();
【问题讨论】:
您能解释一下什么是键集分页吗? 没有 Spring Batch,你会怎么做呢?如果你分享你的代码,我可以帮你创建一个 Spring Batch 阅读器。 @MahmoudBenHassine 我已经添加了一个简短的解释以及实现键集分页的虚拟代码。我们如何在 spring 批处理阅读器中使用它? 好的,感谢您的更新。我添加了一个您需要编译/调整的示例的答案。希望对您有所帮助。 @MahmoudBenHassine 感谢您的回答。我还有一个疑问,如果它是 UUID,lastCusId 的初始值应该是什么 如果您的 customerId 是 UUID,则说明中的第 1 点和第 2 点不再存在,并且这种“键集分页”技术将不再起作用。您需要找到另一个标准来对数据进行排序/分页。 【参考方案1】:您应该能够通过扩展AbstractPaginatedDataItemReader
类来实现分页逻辑。该基类处理大部分分页样板文件,并允许您在doPageRead
中指定分页逻辑。这是一个简单的例子,我会让你相应地调整它:
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.domain.PageRequest;
public class KeySetPagingItemReader extends AbstractPaginatedDataItemReader<Customer>
long lastCusId = 0;
int size = 100;
private CustomerRepository customerRepository;
List<Customer> customers = new ArrayList();
public KeySetPagingItemReader(CustomerRepository customerRepository)
this.customerRepository = customerRepository;
@Override
protected Iterator<Customer> doPageRead()
PageRequest pageRequest = PageRequest.of(0,size);
// Get the lastCusId
lastCusId = getLastCusId(customers);
// Get the data from the database
customers = customerRepository.findByStatusAndCustomerIdGreaterThanOrderByCustomerIdAsc('ACTIVE',lastCusId,pageRequest);
return customers.iterator();
public Long getLastCusId(List<Customer> customers)
// If passed entry is null or empty, return 0 ( This handles the first iteration case )
if ( customers == null || customers.isEmpty())
return 0l;
// Do the logic to sort the customers list by customer_id of each
// Customer object
// Return the last entry
return customers.get(customers.size() -1).getCustomerId();
【讨论】:
我们也可以在这里使用 AbstractPagingItemReader 吗? AbstractPaginatedDataItemReader 和 AbstractPagingItemReader 有什么区别? 是的,你可以。这些确实是相似的,但是它们在处理分页的方式上有细微的差别。 AbstractPaginatedDataItemReader 是按照 Spring Data 处理分页的方式设计的。以上是关于在阅读器中为 Spring 批处理作业实现 Keyset 分页的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Cloud Data Flow 中为 Spring Batch 作业设置调度程序?
Spring Batch Kafka Kafka 到数据库作业