Spring Batch 中的跳过是如何实现的?

Posted

技术标签:

【中文标题】Spring Batch 中的跳过是如何实现的?【英文标题】:How is the skipping implemented in Spring Batch? 【发布时间】:2013-05-10 03:58:31 【问题描述】:

我想知道如何在我的ItemWriter 中确定 Spring Batch 当前是处于块处理模式还是处于备用单项处理模式。首先,我没有找到有关如何实现此回退机制的信息。

即使我还没有找到解决我实际问题的方法,我也想与你分享我对回退机制的了解。

如果我遗漏了任何内容,请随时添加带有附加信息的答案 ;-)

【问题讨论】:

你能解释一下现实世界的问题,它让你想到“作者如何知道当前的处理模式”这个问题吗? 当然 :-) 我正在存储一个业务日志(在我的技术日志旁边)。在此日志中,每个项目的消息应该只出现一次。如果在处理过程中出现异常,我还将在业务日志中为此项目写入错误日志。如果一个项目已被处理但回滚,我对它的错误日志不感兴趣。如果我在单项处理中,我只想记录这些错误。否则,如果我处于块模式,我可能会为正常的项目记录错误,只是因为它们位于坏块中。 【参考方案1】:

跳过机制的实现可以在FaultTolerantChunkProcessor和RetryTemplate中找到。

假设您配置了skippable exceptions,但没有配置retryable exceptions。并且您当前的块中有一个失败的项目导致异常。

现在,首先要写入整个块。在处理器的write() 方法中,您可以看到调用了RetryTemplate。它还获得了对 RetryCallbackRecoveryCallback 的两个引用。

切换到RetryTemplate。找到以下方法:

protected <T> T doExecute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state)

在那里你可以看到RetryTemplate 被重试,只要它没有用尽(即在我们的配置中恰好一次)。这样的重试将由可重试异常引起。不可重试的异常会立即中止这里的重试机制。

重试用尽或中止后,将调用RecoveryCallback

e = handleRetryExhausted(recoveryCallback, context, state);

这就是单项处理模式现在开始的地方!

RecoveryCallback(在处理器的write() 方法中定义!)将锁定输入块(inputs.setBusy(true))并运行其scan() 方法。在那里你可以看到,一个项目是从块中取出的:

List&lt;O&gt; items = Collections.singletonList(outputIterator.next());

如果ItemWriter 可以正确处理此单个项目,则该块将完成并且ChunkOrientedTasklet 将运行另一个块(用于下一个单个项目)。这将导致对RetryCallback 的常规调用,但由于该块已被RecoveryTemplate 锁定,因此将立即调用scan() 方法:

if (!inputs.isBusy()) 
    // ...

else 
    scan(contribution, inputs, outputs, chunkMonitor);

因此将处理另一个单个项目并重复此过程,直到原始块被逐项处理:

if (outputs.isEmpty()) 
    inputs.setBusy(false);

就是这样。我希望你觉得这很有帮助。而且我更希望您可以通过搜索引擎轻松找到它,并且不要浪费太多时间,自己找到它。 ;-)

【讨论】:

【参考方案2】:

解决我原来的问题的一种可能方法(ItemWriter 想知道,它是在块模式还是单项模式)可能是以下替代方案之一:


只有当传递的块大小为 1 时,才需要进行任何进一步的检查

当传递的块是java.util.Collections.SingletonList 时,我们会非常确定,因为FaultTolerantChunkProcessor 会执行以下操作:

列表项 = Collections.singletonList(outputIterator.next());

不幸的是,这个类是私有的,所以我们不能用instanceOf检查它。

反过来,如果块是 ArrayList,我们也可以确定,因为 Spring Batch 的 Chunk 类使用它:

私有列表项 = new ArrayList();

一个模糊的地方是从执行上下文中读取的缓冲项。但我希望那些也是 ArrayLists。

不管怎样,我还是觉得这个方法太模糊了。我宁愿让框架提供这些信息。


另一种方法是将我的ItemWriter 挂接到框架执行中。也许ItemWriteListener.onWriteError() 比较合适。

更新:如果您处于单项模式并在 ItemWriter 中引发异常,则不会调用 onWriteError() 方法。我认为这是一个提交的错误:https://jira.springsource.org/browse/BATCH-2027

所以这个替代方案退出了。


这里有一个 sn-p 可以在没有任何框架的情况下直接在编写器中执行相同操作

    private int writeErrorCount = 0;

@Override
public void write(final List<? extends Long> items) throws Exception 
    try 
        writeWhatever(items);
     catch (final Exception e) 
        if (this.writeErrorCount == 0) 
            this.writeErrorCount = items.size();
         else 
            this.writeErrorCount--;
        

        throw e;
    
    this.writeErrorCount--;


public boolean isWriterInSingleItemMode() 
    return writeErrorCount != 0;

注意:人们应该在这里检查可跳过的异常,而不是一般情况下检查Exception

【讨论】:

我们不能在单例列表上调用 instanceof,但这有效:Class&lt;?&gt; singletonListClazz = Class.forName( "java.util.Collections$SingletonList" ); boolean retrying = false; if( items.getClass().equals( singletonListClazz ) ) retrying = true; @slh777 感谢您的建议。那确实可行。但是,我对这种方法并不满意,因为我们严重依赖框架的实现细节(即在 SingleItemMode 中返回 SingletonList)。我宁愿看到这个被框架封装。 :-(

以上是关于Spring Batch 中的跳过是如何实现的?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法只为特定作业跳过 Spring Batch 的持久元数据?

ItemWriter 的 Spring Batch 跳过异常

Node.js JavaScript 片段中的跳过代码

如何使用想要的跳过检查缺少的数字序列

使用FilterInputStream的跳过方法

spring batch读取数据库怎么用