带有状态更改的 Spring Batch 查询

Posted

技术标签:

【中文标题】带有状态更改的 Spring Batch 查询【英文标题】:Spring Batch querying with state changes 【发布时间】:2018-03-01 01:10:19 【问题描述】:

我正在使用带有 Spring Data JPA 和 Spring Batch 的 Spring Boot 1.5.7。我使用JpaPagingItemReader<T> 读取实体并使用JpaItemWriter<T> 写入它们。我的目标是从某个数据库表中读取数据,将它们转换为不同的格式并将它们写回不同的表(我读取原始 json 字符串,将它们反序列化并将它们插入到它们的特定表中)。

我不打算在处理完读取的数据后删除它们,而是只想将它们标记为已处理。问题是,JpaPagingItemReader 处理读取是否良好,如果我对以下内容进行查询:

    @Bean
    public ItemReader<RdJsonStore> reader()
        JpaPagingItemReader<RdJsonStore> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("select e from RdJsonStore e "+
                              "where e.jsonStoreProcessedPointer is null");
        reader.setPageSize(rawDataProperties.getBatchProcessingSize());
        return reader;
    

所以它只有在没有指向它的指针时才会读取。我会在处理一个条目后插入一个指针(分批,比如我处理 1000 个条目并将它们的所有 id 发布到指针表中)。

如果我像这样更改运行时返回的数据,ItemWriter(和 JPA 的)能否处理读取的数据(它尝试查询的条目每批都会减少)?

如果指针解决方案不适用,我应该如何设计 DB-to-DB 批处理作业?

我的源表如下所示:

【问题讨论】:

不,它不能,底层数据发生变化,对于每个页面重新执行查询,您将开始丢失数据。 @M.Deinum 这就是我的想法:( 【参考方案1】:

如果您查看 JpaPagingItemReader 的代码,对于方法 doReadPage() ,您会注意到这一行,

Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());

createQuery() 在哪里,

private Query createQuery() 
        if (queryProvider == null) 
            return entityManager.createQuery(queryString);
        
        else 
            return queryProvider.createQuery();
        
    

因此,您会看到为每个页面重新创建/执行查询,但不会根据新数据集重新计算页码,并且重新计算页码也没有意义。

getPageSize() 总是返回配置中设置的值,getPage() 返回最后计算的页码(先前处理的页码 + 1),因此如果数据正在缩小,如果页码计算也重新完成,您的程序将正常工作,即您总是从 page = 0 开始,JpaPagingItemReader 不会发生这种情况,因此您将丢失由 M Deinum 在 cmets 中指定的数据。

另外,根据我的理解,添加新数据可以正常工作(前提是在最后根据排序键添加新记录,即使在作业运行期间通常假定 数据锁定) .

我认为,在当前作业运行期间将一行标记为 PROCESSED 没有任何意义,因为框架已经处理了这一点(因为记录没有被处理两次)。

您可能需要为 Next Job Run 将记录标记为 PROCESSED,这可以通过更新不属于 WHERE 子句的单独标志来处理(在作业运行期间),然后在作业结束时 - 更新一个标志,它是 WHERE 子句的一部分(您在 WHERE 子句中使用它来指示已处理的记录)。

【讨论】:

我想标记它们,因为我的工作将由用户手动启动。如果他们再次向相同的资源启动 Job,他们可能会复制数据。但我决定在写作时处理这个问题,而不是在阅读时处理。

以上是关于带有状态更改的 Spring Batch 查询的主要内容,如果未能解决你的问题,请参考以下文章

在 Spring Boot 应用程序中嵌入带有作业定义的 Spring Batch Admin

Spring Batch 作业状态未在数据库中更新

带有 sortKeys 和参数值的 Spring Batch Paging

重试不使用带有 Java Config 的 Spring Batch

记录 Spring Batch 执行的 SQL 查询

Spring Batch 使用带有 Spring Boot 的 MongoDB 抛出无法确定数据库类型的嵌入式数据库驱动程序类 NONE