用于动态块大小的 Spring Batch 自定义完成策略
Posted
技术标签:
【中文标题】用于动态块大小的 Spring Batch 自定义完成策略【英文标题】:Spring Batch custom completion policy for dynamic chunk size 【发布时间】:2016-09-20 07:27:58 【问题描述】:上下文
我们有一个批处理作业,将本地化的国家名称(即将国家名称翻译成不同的语言)从外部复制到我们的数据库中。这个想法是在一个块中处理单个国家的所有本地化国家名称(即第一个块 - 安道尔的所有翻译,下一个块 - 阿联酋的所有翻译等)。我们使用JdbcCursorItemReader
读取外部数据 + 一些 oracle 分析函数来提供该国家可用的翻译总数:类似于
select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code
问题
因此,按块切割此输入看起来很简单:当您读取lng_count
中指定的确切行数时停止块并从下一个读取行开始一个新的,但实际上看起来并不那么简单: (
首先要尝试的是自定义完成策略。但问题是,它无权访问ItemReader
读取的最后一项——您应该明确地将其置于阅读器的上下文中,并将其重新置于策略中。不喜欢它,因为它需要额外的阅读器修改/添加阅读器侦听器。此外,我不喜欢来回序列化/反序列化相同的项目。而且我觉得JobContext
/StepContext
不是存放此类数据的好地方。
还有 RepeatContext
看起来是存放此类数据的更好地方,但我无法轻松地找到它...
所以最后我们得到了这样的解决方案:
@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
final StepBuilderFactory stepBuilderFactory,
final MasterdataCountryNameReader countryNameReader,
final MasterdataCountryNameProcessor countryNameProcessor,
final MasterdataCountryNameWriter writer)
/* Use the same fixed-commit policy, but update it's chunk size dynamically */
final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
return stepBuilderFactory.get("localizedCountryNamesStep")
.<ExtCountryLng, LocalizedCountryName> chunk(policy)
.reader(countryNameReader)
.listener(new ItemReadListener<ExtCountryLng>()
@Override
public void beforeRead()
// do nothing
@Override
public void afterRead(final ExtCountryLng item)
/* Update the cunk size after every read: consequent reads
inside the same country = same chunk do nothing since lngCount is always the same there */
policy.setChunkSize(item.getLngCount());
@Override
public void onReadError(final Exception ex)
// do nothing
)
.processor(countryNameProcessor)
.writer(writer)
.faultTolerant()
.skip(RuntimeException.class)
.skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
.retryLimit(0) // this solution disables only retry, but not recover
.build();
它工作正常,只需要最少的代码更改,但对我来说还是有点难看。所以我想知道,当ItemReader
上已经提供了所有必需的信息时,是否还有另一种优雅的方法可以在 Spring Batch 中执行动态块大小?
【问题讨论】:
afterRead 听起来不是更改块大小的正确位置,我会将其放在 afterWrite 中以对下一个块生效 从逻辑上讲afterWrite
听起来是对的,但是 1)在编写一个不带额外数据库查询的卡盘后,您没有该信息 2)第一个块的大小仍应以某种方式确定 - 另一个额外的数据库查询?
您是否在处理之前清除了目标表?还是这只是一份一次性工作?
@DeanClark ,不,这是一个全面的“和解”:插入新记录,更新更新记录,删除删除记录。这就是为什么必须一次向作者提供与单个国家/地区相关的所有本地化国家/地区名称。
【参考方案1】:
最简单的方法是简单地按国家/地区划分。这样,每个国家/地区都会有自己的步伐,您还可以跨国家/地区进行线程以提高性能。
如果需要单个阅读器,您可以包装一个委托 PeekableItemReader
并扩展 SimpleCompletionPolicy
以实现您的目标。
public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem>
private PeekableItemReader<? extends CountrySpecificItem> delegate;
private CountrySpecificItem currentReadItem = null;
@Override
public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception
currentReadItem = delegate.read();
return currentReadItem;
@Override
public RepeatContext start(final RepeatContext context)
return new ComparisonPolicyTerminationContext(context);
protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext
public ComparisonPolicyTerminationContext(final RepeatContext context)
super(context);
@Override
public boolean isComplete()
final CountrySpecificItem nextReadItem = delegate.peek();
// logic to check if same country
if (currentReadItem.isSameCountry(nextReadItem))
return false;
return true;
然后在你的上下文中你会定义:
<batch:tasklet>
<batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>
<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
<property name="delegate" ref="peekableReader" />
</bean>
<bean id="peekableReader" class="YourPeekableItemReader" />
编辑:回想一下你的问题,我觉得分区是最干净的方法。使用partitioned step,每个ItemReader(确保scope="step"
)将从步骤执行上下文中传递一个countryName
。是的,您需要一个自定义 Partitioner
类来构建您的执行上下文地图(每个国家/地区一个条目)和一个足够大的硬编码提交间隔以容纳您最大的工作单元,但之后一切都非常样板,并且由于每个从属步骤将只是一个块,因此对于任何可能遇到问题的国家/地区来说,重新启动应该是一件轻而易举的事。
【讨论】:
这就是我们实际开始的地方 :) 但我相信(如果我错了,请纠正我)这种分区实际上违反了 Spring Batch 的主要概念:您通常应该使用确切的项目,您将在阅读器中处理而不是结合 Batch 的功能 - 它使您可以更精细地控制情况。但是,即使与我将可窥视阅读器与完成策略的分区保持一致,它仍然可以工作,但是仍然需要它的自定义实现......让我们等待更多答案,如果没有 - 这个将被接受;) 如果每个分区都覆盖自己的国家/地区,您可以将提交间隔设置为相当大的值,以确保提交覆盖最大的国家/地区。也就是说,“纯”春季批处理方法将是单个读取器/写入器、有意义的块大小(可能是 500 个房间),以及从失败的中部国家中提取和重新处理的可重新启动性。我实际上还有另一个想法,那就是更“真正的北方”,很快就会编辑我的答案。 我试图实现这个解决方案。我有以下错误: Bean 属性“委托”不可写或设置方法无效。 setter 的参数类型是否与 getter 的返回类型匹配?你知道如何解决它吗? 你可能需要一个setDelegate(PeekableItemReader<? extends CountrySpecificItem> delegate)
方法...任何属性都需要一个关联的setter方法以上是关于用于动态块大小的 Spring Batch 自定义完成策略的主要内容,如果未能解决你的问题,请参考以下文章
Spring Batch:如何将 jobParameters 传递给自定义 bean?
Spring Batch - 创建两个数据源以及如何自定义使用其他属性