Spring Webflux 端点作为主题工作
Posted
技术标签:
【中文标题】Spring Webflux 端点作为主题工作【英文标题】:Spring Webflux endpoint working as a topic 【发布时间】:2021-07-03 11:50:46 【问题描述】:我有一个 Flux 端点,我提供给客户(订阅者)以接收更新的价格。我正在测试它通过浏览器访问 URL (http://localhost:8080/prices),它工作正常。我面临的问题(我可能在这里遗漏了一些概念)是当我在许多浏览器中打开此 URL 时,我希望在所有浏览器中都收到通知,但只有一个收到。它作为队列而不是主题工作(如在消息代理中)。这是正确的行为吗?
@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices()
return Flux.interval(Duration.ofSeconds(5))
.map(sec -> pricesQueue.get())
.filter(prices -> !prices.isEmpty())
.map(prices -> ServerSentEvent.<Collection<Price>> builder()
.event("status-changed")
.data(prices)
.build());
【问题讨论】:
【参考方案1】:get
不是标准的队列操作,但这几乎可以肯定是因为您的pricesQueue.get()
方法不是 幂等的。对于每个请求(在这种情况下,您打开的每个浏览器窗口),您都会得到一个新的通量,它每 5 秒调用一次pricesQueue.get()
。现在,如果pricesQueue.get()
只是检索队列中的最新项目并且什么都不做,那么一切都很好 - 您的所有订阅者都会收到相同的项目,并显示相同的项目。但是,如果它更像poll()
,在它删除队列中的项目检索到它之后,那么只有第一个通量会获得该值 - 其余的不会,到那时它将被删除。
您实际上有两个主要选择:
更改您的get()
实现(或实现新方法),使其不会改变队列,仅检索值。
将助焊剂变成热助焊剂。将Flux.interval(Duration.ofSeconds(5)).map(sec -> pricesQueue.get()).publish().autoConnect()
存储为字段(假设为queueFlux
),然后在控制器方法中返回queueFlux.filter(prices -> !prices.isEmpty()).map(...)
。
【讨论】:
是的,get() 是对的,它的行为类似于 poll(),尽管我认为这不是我在这里缺少的概念。我想要的行为与使用 SseEmiter 类相同,只有当前订阅者会收到通知(所有订阅者)。 更改 get() 实现以保留更多订阅者的值不是可取的行为,它还会使订阅者多次收到相同的通知。 把通量变成热通量很好(谢谢),但它也让更多的订阅者接收到价值。正如我所说,我只想让当前订阅者在某个时刻收到通知。 @VeryNiceArgumentException 在这种情况下,请尝试使用publish()
而不是replay()
?
它有点工作。我面临的问题是:3 个订阅者在等待价格,价格 1 到达,只有订阅者 1 收到,然后价格 2 到达,订阅者 1 和订阅者 2 收到,然后价格 3 到达,所有 3订阅者收到它,从那时起,一切都像魅力一样。有什么想法吗?以上是关于Spring Webflux 端点作为主题工作的主要内容,如果未能解决你的问题,请参考以下文章
spring 5 webflux 功能端点请求中不存在访问控制源头
如何使用 Spring WebFlux 为执行器端点添加 URL 别名?
如何在 spring-mvc 中将日志记录添加到 webflux 端点?
如何在 spring-webflux RouterFunction 端点中使用 OpenApi 注释?
Spring WebFlux Webclient 接收应用程序/八位字节流文件作为 Mono
Spring Webflux 2.4.2 - 执行器 /auditevents /httptrace /integrationgraph /sessions 端点上的 404