春季批处理不处理所有记录
Posted
技术标签:
【中文标题】春季批处理不处理所有记录【英文标题】:spring batch not processing all records 【发布时间】:2020-02-12 23:52:48 【问题描述】:我正在使用 Spring Batch 从使用 RepositoryItemReader 的 postgresql DB 读取记录,然后将其写入主题。 我看到大约有 100 万条记录需要处理,但它没有处理所有记录。 我已将阅读器的 pageSize 设置为 10,000,并且与提交间隔(块大小)相同
@Bean
public TaskletStep broadcastProductsStep()
return stepBuilderFactory.get("broadcastProducts")
.<Product, Product> chunk(10000)
.reader(productsReader.repositoryItemReader())
.processor(productsProcessor)
.writer(compositeItemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100000)
.processorNonTransactional()
.listener(new SkipListenerProducts())
.listener(productsChunkListener)
.build();
@Bean
public RepositoryItemReader repositoryItemReader()
RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();
try
repositoryReader.setRepository(skuRepository);
repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
repositoryReader.setPageSize(10000);
repositoryReader.setSaveState(false);
List<List<String>> arguments = new ArrayList<>();
arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
SkuStatus.DISCONTINUED.getValue().toString())
.collect(Collectors.toList()));
repositoryReader.setArguments(arguments);
Map sorts = new HashMap();
sorts.put("catalog_number", Sort.Direction.ASC);
repositoryReader.setSort(sorts);
repositoryReader.afterPropertiesSet();
catch (Exception exception)
exception.printStackTrace();
return repositoryReader;
@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode,
Pageable pageable);
【问题讨论】:
写完主题后你会更改 IS_UPDATED 列吗? 是的...稍后在 writer 中将其修改为 false。 【参考方案1】:问题可能是您在阅读器查询 (IS_UPDATED) 的条件上混合了分页和更新。
页面大小 = db 中的 2 行和 6 行的示例
A IS_UPDATED=true B IS_UPDATED=true C IS_UPDATED=true D IS_UPDATED=true E IS_UPDATED=true F IS_UPDATED=true第一次阅读页面 = 1 返回行 A 和 B
编写器执行后(将 A 和 B 的 IS_UPDATED 设置为 false),我们在 db 中:
C IS_UPDATED=true D IS_UPDATED=true E IS_UPDATED=true F IS_UPDATED=true第二次阅读将移至第 2 页,因此它将采用 E & F 而不是 C & D
要么:
-
您不应更新 IS_UPDATED 列。
或者您创建
RepositoryItemReader
的子类并在其中覆盖getPage
@Override
public int getPage()
return 0;
选项 2 对批处理崩溃/错误更具弹性,但您必须确保在您的编写器中始终将 IS_UPDATED 设置为 false,否则阅读器将无限期循环。
如果您使用多线程步骤,选项 2 还需要更多调整。
【讨论】:
非常感谢@benbenw。这有帮助。 @Goni_code_love ,如果这真的有帮助,你可以投票给答案。 @benbenw 您不能在 RepositoryItemReader 中覆盖 getPage()。它不存在。以上是关于春季批处理不处理所有记录的主要内容,如果未能解决你的问题,请参考以下文章
未找到产品名称的数据库类型:春季批处理中的 [Apache Hive]