Spring集成轮询器与调度程序

Posted

技术标签:

【中文标题】Spring集成轮询器与调度程序【英文标题】:spring integration poller vs dispatcher 【发布时间】:2014-02-16 22:03:13 【问题描述】:

我正在尝试使用 spring 集成设置一个简单的应用程序。目标是简单地使用文件入站通道适配器来监视目录中的新文件并在添加文件时对其进行处理。为简单起见,目前处理文件只是记录一些输出(正在处理的文件的名称)。但是,我确实想以多线程方式处理文件。所以假设有 10 个文件被拾取,应该并行处理,一旦这些文件完成,我们就继续处理接下来的 10 个文件。

为此,我尝试了两种不同的方法,它们似乎都工作得差不多,我想了解使用 poller 或 dispatcher 来处理此类事情之间的区别。

方法 #1 - 使用轮询器

<int-file:inbound-channel-adapter id="filesIn" directory="in">
        <int:poller fixed-rate="1" task-executor="executor" />
</int-file:inbound-channel-adapter>

<int:service-activator ref="moveToStage" method="move" input-channel="filesIn" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="DISCARD" />

因此,据我所知,这里的想法是我们不断轮询目录,一旦收到文件,就会将其发送到 filesIn 通道,直到达到池限制。然后直到池被占用,即使我假设轮询仍在后台继续,也不会发送其他文件。这似乎可行,但我不确定使用每次轮询的最大消息是否有助于降低轮询频率。通过将每次轮询的最大消息数设置为接近池大小。

方法 #2 - 使用调度程序

<int-file:inbound-channel-adapter id="filesIn" directory="in">
    <int:poller fixed-rate="5000" max-messages-per-poll="3" />
</int-file:inbound-channel-adapter>

<int:bridge input-channel="filesIn" output-channel="filesReady" />

<int:channel id="filesReady">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:service-activator ref="moveToStage" method="move" input-channel="filesInReady" />

<task:executor id="executor" pool-size="5" queue-capacity="0" rejection-policy="CALLER_RUNS" />

好的,所以这里轮询器没有使用执行器,所以我假设它以顺序方式轮询。每个轮询 3 个文件都应该被拾取,然后发送到 filesReady 通道,该通道然后使用调度程序将文件传递给服务激活器,并且因为它使用调度程序的执行器,它立即返回控制并允许 filesIn 通道发送更多文件。

我想我的问题是我是否正确理解了这两种方法,以及一种方法是否比另一种更好。

谢谢

【问题讨论】:

【参考方案1】:

是的,你的理解是正确的。

一般来说,我会说每毫秒轮询一次(并在队列已满时丢弃轮询)是对资源(CPU 和 I/O)的浪费。

此外,在第一种情况下增加每个轮询的最大消息数也无济于事,因为轮询是在执行程序线程上完成的(调度程序将轮询交给执行程序,该线程将处理mmpp)。

在第二种情况下,由于调度程序线程在轮询期间(而不是之前)进行交接,mmpp 将按预期工作。

因此,一般而言,您的第二个实现是最好的(只要您可以忍受新文件到达时平均 2.5 秒的延迟)。

【讨论】:

好的,这是有道理的。它还允许轮询进程获取文件,而与它们在管道中的处理速度无关。我想知道文件轮询器是否选择了太多文件并传递到下一个使用调度程序和任务执行器的通道。因为我使用 caller_runs 策略确实会使轮询器在线程数达到其限制时也将暂停,现在调度程序必须在控制返回到文件轮询器之前完成当前排队的文件,以便它可以继续轮询更多文件。

以上是关于Spring集成轮询器与调度程序的主要内容,如果未能解决你的问题,请参考以下文章

Spring 批处理作业应仅在 Spring 集成轮询器之后执行一次

Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次

Spring Integration 没有为端点定义轮询器

Spring 集成 ConcurrentMetadataStore / RedisMetadataStore

没有为通道适配器定义轮询器

是否可以为数据源中的每个实体配置轮询器?