是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?

Posted

技术标签:

【中文标题】是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?【英文标题】:Is it possible to make a Poller (or PollableMessageSource) to poll messages as List? 【发布时间】:2021-04-19 12:11:47 【问题描述】:

按照在 GitHub https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-polling-binder-sample 中找到的关于从 PubSub 订阅轮询消息的示例,我想知道...

是否可以让PollableMessageSource 检索List<Message<?>> 而不是每次轮询的单个消息?

我看到@Poller 符号仅用于Source 类型化对象,从未用于ProcessorSink。例如使用@StreamListener 或使用函数式方法时,是否可以在这种情况下使用?

【问题讨论】:

【参考方案1】:

PollableMessageSource 绑定和Source 流应用程序完全基于 Spring Integration 的 PollerMessageSource 抽象,其中它的合同是为配置的通道生成单个消息。消息传递的重点实际上是处理不影响其他人的单个消息。一条消息的失败并不意味着流程中的其他消息失败。

另一方面,您可能是指将 GCP Pub/Sub 消息作为 Spring 消息负载中的列表生成。这确实是可能的,但是通过来自 Pub/Sub 消费者和 MessageSource impl 的一些自定义代码。尽管我会三思而后行,期望从源头上批量处理。如果您的进一步逻辑是关于作为列表处理,您可能会使用聚合器来构建一些小窗口。但同样:这将是一条 Spring 消息。

开始考虑反应式函数实现可能会更好,您确实可以期望 Flux<Message<?>> 作为输入,Spring Cloud Stream 框架会照顾您如何将数据从 Pub/Sub 发送到您的反应式流中期待。

在文档中查看更多信息:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_reactive_functions_support

【讨论】:

感谢 Artem 的快速回复!有没有关于如何将 Flux> 消耗到固定大小的列表中,相应地处理它并继续处理的示例?还在纠结它是如何阻塞的,在尝试做某事时抛出异常......所以一个起点会很棒! 我不会说收集流式通量列表是个好主意。我们绝对不知道代理上的队列何时结束。对,从来没有。因此,作为函数输入的Flux 仍然很热。因此,试图将其收集到一个列表中是行不通的。您可以考虑使用 `buffer()` 运算符,但这已经是另一回事了...

以上是关于是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?的主要内容,如果未能解决你的问题,请参考以下文章

Cacti优化之spine轮询器

AngularJs轮询器写法

Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次

没有为通道适配器定义轮询器

如何使用 ScheduledExecutorService 实现固定速率轮询器?

Sprint 集成 DSL - Http 入站适配器和轮询器