Spring Integration - 入站通道适配器执行下游通道并行处理

Posted

技术标签:

【中文标题】Spring Integration - 入站通道适配器执行下游通道并行处理【英文标题】:Spring Integration - Inbound channel Adapter to execute down stream channel parallel processing 【发布时间】:2020-12-03 16:15:29 【问题描述】:

我正在尝试在注释中配置以下工作流

带有轮询器(cron 触发器)的入站通道适配器计划每 30 分钟运行一次 轮询文件表单目录,即 10 个文件并移动到阶段目录 对于每个文件需要并行调用一个批处理作业,即 10 个作业应该与轮询的不同文件并行运行

我能够实现所有目标,但无法配置下游执行器通道以并行运行作业。

以下是参考实现。一切都在工作,即作业是在文件之后启动文件,但它需要为不同的文件并行启动作业 感谢您对此的任何帮助

@InboundChannelAdapter (incoming channel, custompoller) 
public MessageSource<File> pollFile ( Directory Scanner)  
 

public PollerMetadata custompoller(errorhandler) 
poller.trigger(cron for every 10 minutes)


@ServiceActivator(incoming channel)
public MessageHandler filewritertotempdiretory() 
outputchannel(tempdirchannel) 



@ServiceActivator(inputChannel = tempdirchannel)
public MessageHandler tempdirfilehandler() 
    MethodInvokingMessageHandler messageHandler =   (launcher class, "methodname");
        return messageHandler;



Poller Metadata.  Read in some other SO that we should not put the task executor when setting poller on cron, is that  true ?  
also how can i make the messages polled (say 10 messages polled) execute in parallel i.e. add task executor in poller metadata 

    @Bean
    public PollerMetadata preProcessPoller(MessagePublishingErrorHandler errorHandler) 
        PollerMetadata poller = new PollerMetadata();
        poller.setTrigger(new CronTrigger("0/15 * * * * ?"));
        poller.setMaxMessagesPerPoll(Long.valueOf(maxMessagesPerPoll));
        errorHandler.setDefaultErrorChannel(errorChannel());
        poller.setErrorHandler(errorHandler);
        return poller;
    

【问题讨论】:

【参考方案1】:

您需要出示完整的PollerMetadata 配置。

最好的猜测是你还没有设置maxMessagesPerPoll

默认情况下,对于入站通道适配器,maxMessagesPerPoll 为 1。

您可以将TaskExecutor 添加到轮询元数据以并行运行消息,或将incoming channel 设置为ExecutorChannel

【讨论】:

Thaks Gary,现在用 Poller 元数据配置的详细信息修改了问题。我正在查看的每个民意调查的最大消息数目前为 10 条。此外,一旦文件被轮询,它就会被移动到阶段目录以进行进一步处理 &gt;Read in some other SO that we should not put the task executor when setting poller on cron, is that true ? 没有;触发器是什么并不重要。 非常感谢 Gary,当它与启动 spring 批处理作业一起使用时,任何关于任务执行器配置的参考。另外我正在异步执行批处理作业,如果我们设置任务执行器会不会有任何问题。

以上是关于Spring Integration - 入站通道适配器执行下游通道并行处理的主要内容,如果未能解决你的问题,请参考以下文章

Spring-Integration:在异常时不发送Tcp服务器响应

使用 spring-integration-dsl 的动态 http 入站网关

Spring Integration DSL JMS 入站/出站网关

在 Spring Integration 上获取 JMS 标头

如何测试在弹簧集成中使用通道的热文件夹?

Spring Integration 没有输出通道或回复通道