Spring Webflux 端点作为主题工作

Posted

技术标签:

【中文标题】Spring Webflux 端点作为主题工作【英文标题】:Spring Webflux endpoint working as a topic 【发布时间】:2021-07-03 11:50:46 【问题描述】:

我有一个 Flux 端点,我提供给客户(订阅者)以接收更新的价格。我正在测试它通过浏览器访问 URL (http://localhost:8080/prices),它工作正常。我面临的问题(我可能在这里遗漏了一些概念)是当我在许多浏览器中打开此 URL 时,我希望在所有浏览器中都收到通知,但只有一个收到。它作为队列而不是主题工作(如在消息代理中)。这是正确的行为吗?

@GetMapping(value = "prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Price>>> prices() 
return Flux.interval(Duration.ofSeconds(5))
        .map(sec -> pricesQueue.get())
        .filter(prices -> !prices.isEmpty())
        .map(prices -> ServerSentEvent.<Collection<Price>> builder()
                .event("status-changed")
                .data(prices)
                .build());

【问题讨论】:

【参考方案1】:

get 不是标准的队列操作,但这几乎可以肯定是因为您的pricesQueue.get() 方法不是 幂等的。对于每个请求(在这种情况下,您打开的每个浏览器窗口),您都会得到一个新的通量,它每 5 秒调用一次pricesQueue.get()。现在,如果pricesQueue.get() 只是检索队列中的最新项目并且什么都不做,那么一切都很好 - 您的所有订阅者都会收到相同的项目,并显示相同的项目。但是,如果它更像poll(),在它删除队列中的项目检索到它之后,那么只有第一个通量会获得该值 - 其余的不会,到那时它将被删除。

您实际上有两个主要选择:

更改您的 get() 实现(或实现新方法),使其不会改变队列,仅检索值。 将助焊剂变成热助焊剂。将Flux.interval(Duration.ofSeconds(5)).map(sec -&gt; pricesQueue.get()).publish().autoConnect() 存储为字段(假设为queueFlux),然后在控制器方法中返回queueFlux.filter(prices -&gt; !prices.isEmpty()).map(...)

【讨论】:

是的,get() 是对的,它的行为类似于 poll(),尽管我认为这不是我在这里缺少的概念。我想要的行为与使用 SseEmiter 类相同,只有当前订阅者会收到通知(所有订阅者)。 更改 get() 实现以保留更多订阅者的值不是可取的行为,它还会使订阅者多次收到相同的通知。 把通量变成热通量很好(谢谢),但它也让更多的订阅者接收到价值。正如我所说,我只想让当前订阅者在某个时刻收到通知。 @VeryNiceArgumentException 在这种情况下,请尝试使用publish() 而不是replay() 它有点工作。我面临的问题是:3 个订阅者在等待价格,价格 1 到达,只有订阅者 1 收到,然后价格 2 到达,订阅者 1 和订阅者 2 收到,然后价格 3 到达,所有 3订阅者收到它,从那时起,一切都像魅力一样。有什么想法吗?

以上是关于Spring Webflux 端点作为主题工作的主要内容,如果未能解决你的问题,请参考以下文章

spring 5 webflux 功能端点请求中不存在访问控制源头

如何使用 Spring WebFlux 为执行器端点添加 URL 别名?

如何在 spring-mvc 中将日志记录添加到 webflux 端点?

如何在 spring-webflux RouterFunction 端点中使用 OpenApi 注释?

Spring WebFlux Webclient 接收应用程序/八位字节流文件作为 Mono

Spring Webflux 2.4.2 - 执行器 /auditevents /httptrace /integrationgraph /sessions 端点上的 404