是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?
Posted
技术标签:
【中文标题】是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?【英文标题】:Is it possible to make a Poller (or PollableMessageSource) to poll messages as List? 【发布时间】:2021-04-19 12:11:47 【问题描述】:按照在 GitHub https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-polling-binder-sample 中找到的关于从 PubSub 订阅轮询消息的示例,我想知道...
是否可以让PollableMessageSource
检索List<Message<?>>
而不是每次轮询的单个消息?
我看到@Poller
符号仅用于Source
类型化对象,从未用于Processor
或Sink
。例如使用@StreamListener
或使用函数式方法时,是否可以在这种情况下使用?
【问题讨论】:
【参考方案1】:PollableMessageSource
绑定和Source
流应用程序完全基于 Spring Integration 的 Poller
和 MessageSource
抽象,其中它的合同是为配置的通道生成单个消息。消息传递的重点实际上是处理不影响其他人的单个消息。一条消息的失败并不意味着流程中的其他消息失败。
另一方面,您可能是指将 GCP Pub/Sub 消息作为 Spring 消息负载中的列表生成。这确实是可能的,但是通过来自 Pub/Sub 消费者和 MessageSource
impl 的一些自定义代码。尽管我会三思而后行,期望从源头上批量处理。如果您的进一步逻辑是关于作为列表处理,您可能会使用聚合器来构建一些小窗口。但同样:这将是一条 Spring 消息。
开始考虑反应式函数实现可能会更好,您确实可以期望 Flux<Message<?>>
作为输入,Spring Cloud Stream 框架会照顾您如何将数据从 Pub/Sub 发送到您的反应式流中期待。
在文档中查看更多信息:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_reactive_functions_support
【讨论】:
感谢 Artem 的快速回复!有没有关于如何将 Flux> 消耗到固定大小的列表中,相应地处理它并继续处理的示例?还在纠结它是如何阻塞的,在尝试做某事时抛出异常......所以一个起点会很棒! 我不会说收集流式通量列表是个好主意。我们绝对不知道代理上的队列何时结束。对,从来没有。因此,作为函数输入的Flux
仍然很热。因此,试图将其收集到一个列表中是行不通的。您可以考虑使用 `buffer()` 运算符,但这已经是另一回事了...以上是关于是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?的主要内容,如果未能解决你的问题,请参考以下文章
Spring 批处理作业应仅在 Spring 集成文件轮询器轮询文件后执行一次