用于动态块大小的 Spring Batch 自定义完成策略

Posted

技术标签:

【中文标题】用于动态块大小的 Spring Batch 自定义完成策略【英文标题】:Spring Batch custom completion policy for dynamic chunk size 【发布时间】:2016-09-20 07:27:58 【问题描述】:

上下文

我们有一个批处理作业,将本地化的国家名称(即将国家名称翻译成不同的语言)从外部复制到我们的数据库中。这个想法是在一个块中处理单个国家的所有本地化国家名称(即第一个块 - 安道尔的所有翻译,下一个块 - 阿联酋的所有翻译等)。我们使用JdbcCursorItemReader 读取外部数据 + 一些 oracle 分析函数来提供该国家可用的翻译总数:类似于

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code

问题

因此,按块切割此输入看起来很简单:当您读取lng_count 中指定的确切行数时停止块并从下一个读取行开始一个新的,但实际上看起来并不那么简单: (

首先要尝试的是自定义完成策略。但问题是,它无权访问ItemReader 读取的最后一项——您应该明确地将其置于阅读器的上下文中,并将其重新置于策略中。不喜欢它,因为它需要额外的阅读器修改/添加阅读器侦听器。此外,我不喜欢来回序列化/反序列化相同的项目。而且我觉得JobContext/StepContext 不是存放此类数据的好地方。

还有 RepeatContext 看起来是存放此类数据的更好地方,但我无法轻松地找到它...

所以最后我们得到了这样的解决方案:

@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
        final StepBuilderFactory stepBuilderFactory,
        final MasterdataCountryNameReader countryNameReader,
        final MasterdataCountryNameProcessor countryNameProcessor,
        final MasterdataCountryNameWriter writer) 
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
    return stepBuilderFactory.get("localizedCountryNamesStep")
            .<ExtCountryLng, LocalizedCountryName> chunk(policy)
            .reader(countryNameReader)
            .listener(new ItemReadListener<ExtCountryLng>() 

                @Override
                public void beforeRead() 
                    // do nothing
                

                @Override
                public void afterRead(final ExtCountryLng item) 
                    /* Update the cunk size after every read: consequent reads 
                    inside the same country = same chunk do nothing since lngCount is always the same there */
                    policy.setChunkSize(item.getLngCount());
                

                @Override
                public void onReadError(final Exception ex) 
                    // do nothing
                
            )
            .processor(countryNameProcessor)
            .writer(writer)
            .faultTolerant()
            .skip(RuntimeException.class)
            .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
            .retryLimit(0) // this solution disables only retry, but not recover
            .build();

它工作正常,只需要最少的代码更改,但对我来说还是有点难看。所以我想知道,当ItemReader 上已经提供了所有必需的信息时,是否还有另一种优雅的方法可以在 Spring Batch 中执行动态块大小?

【问题讨论】:

afterRead 听起来不是更改块大小的正确位置,我会将其放在 afterWrite 中以对下一个块生效 从逻辑上讲afterWrite 听起来是对的,但是 1)在编写一个不带额外数据库查询的卡盘后,您没有该信息 2)第一个块的大小仍应以某种方式确定 - 另一个额外的数据库查询? 您是否在处理之前清除了目标表?还是这只是一份一次性工作? @DeanClark ,不,这是一个全面的“和解”:插入新记录,更新更新记录,删除删除记录。这就是为什么必须一次向作者提供与单个国家/地区相关的所有本地化国家/地区名称。 【参考方案1】:

最简单的方法是简单地按国家/地区划分。这样,每个国家/地区都会有自己的步伐,您还可以跨国家/地区进行线程以提高性能。

如果需要单个阅读器,您可以包装一个委托 PeekableItemReader 并扩展 SimpleCompletionPolicy 以实现您的目标。

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> 

    private PeekableItemReader<? extends CountrySpecificItem> delegate;

    private CountrySpecificItem currentReadItem = null;

    @Override
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception 
        currentReadItem = delegate.read();
        return currentReadItem;
    

    @Override
    public RepeatContext start(final RepeatContext context) 
        return new ComparisonPolicyTerminationContext(context);
    

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext 

        public ComparisonPolicyTerminationContext(final RepeatContext context) 
            super(context);
        

        @Override
        public boolean isComplete() 
            final CountrySpecificItem nextReadItem = delegate.peek();

            // logic to check if same country
            if (currentReadItem.isSameCountry(nextReadItem)) 
                return false;
            

            return true;
        
    

然后在你的上下文中你会定义:

<batch:tasklet>
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
     <property name="delegate" ref="peekableReader" />
</bean>


<bean id="peekableReader" class="YourPeekableItemReader" />

编辑:回想一下你的问题,我觉得分区是最干净的方法。使用partitioned step,每个ItemReader(确保scope="step")将从步骤执行上下文中传递一个countryName。是的,您需要一个自定义 Partitioner 类来构建您的执行上下文地图(每个国家/地区一个条目)和一个足够大的硬编码提交间隔以容纳您最大的工作单元,但之后一切都非常样板,并且由于每个从属步骤将只是一个块,因此对于任何可能遇到问题的国家/地区来说,重新启动应该是一件轻而易举的事。

【讨论】:

这就是我们实际开始的地方 :) 但我相信(如果我错了,请纠正我)这种分区实际上违反了 Spring Batch 的主要概念:您通常应该使用确切的项目,您将在阅读器中处理而不是结合 Batch 的功能 - 它使您可以更精细地控制情况。但是,即使与我将可窥视阅读器与完成策略的分区保持一致,它仍然可以工作,但是仍然需要它的自定义实现......让我们等待更多答案,如果没有 - 这个将被接受;) 如果每个分区都覆盖自己的国家/地区,您可以将提交间隔设置为相当大的值,以确保提交覆盖最大的国家/地区。也就是说,“纯”春季批处理方法将是单个读取器/写入器、有意义的块大小(可能是 500 个房间),以及从失败的中部国家中提取和重新处理的可重新启动性。我实际上还有另一个想法,那就是更“真正的北方”,很快就会编辑我的答案。 我试图实现这个解决方案。我有以下错误: Bean 属性“委托”不可写或设置方法无效。 setter 的参数类型是否与 getter 的返回类型匹配?你知道如何解决它吗? 你可能需要一个setDelegate(PeekableItemReader&lt;? extends CountrySpecificItem&gt; delegate) 方法...任何属性都需要一个关联的setter方法

以上是关于用于动态块大小的 Spring Batch 自定义完成策略的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch 不使用自定义数据源创建表

Spring Batch:如何将 jobParameters 传递给自定义 bean?

在 Spring Batch 应用程序中自定义步骤的参数

Spring Batch - 创建两个数据源以及如何自定义使用其他属性

在 Spring Batch 中动态设置 gridSize(线程数)

如何在培训期间更改批量大小?