Spring WebFlux 休息控制器仅服务于前两个订阅

Posted

技术标签:

【中文标题】Spring WebFlux 休息控制器仅服务于前两个订阅【英文标题】:Spring WebFlux rest controller serves only first two subscriptions 【发布时间】:2018-09-08 12:18:47 【问题描述】:

Flux.create 方法以编程方式创建了一个通量:

Flux<Tweet> flux = Flux.<Tweet>create(emitter -> ...);

有一个休息控制器:

@RestController
public class StreamController 
    ...

    @GetMapping("/top-words")
    public Flux<TopWords> streamTopWords() 
        return topWordsStream.getTopWords();
    

有几个 Web 客户端(在独立进程中):

Flux<TopWords> topWordsFlux = WebClient.create(".../top-words")
        .method(HttpMethod.GET)
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(TopWords.class)
        .subscribe(System.out::println);

javascript 中有几个 EventSource 实例:

var eventSource = new EventSource(".../top-words");

eventSource.onmessage = function (e) 
    console.log("Processing message: ", e.data);
;

只有前两个“订阅者”会开始接收消息(无论是 Web 客户端还是 EventSource 实例)。另一个将打开连接,获得 HTTP 200 状态,但事件流保持为空。客户端或服务器端都没有错误。

我不明白,“2 个订阅者”的限制在哪里。如果我想支持超过 2 个订阅者,我需要做什么?

应用程序使用 Spring Boot 2.0.0.RELEASE 构建,并使用 spring-boot-starter-webflux 自动配置。默认配置没有改变。

【问题讨论】:

您能否提供有关提供Flux&lt;TopWords&gt;(代码sn-p)的实现的更多信息?如果您在此处添加 .log() 运算符,您会看到什么:.getTopWords().log() 感谢您的提示。这是我尝试适应的底层 API 的限制(Twitter 流 API)。我通过Flux.create 创建了 Flux,但没有意识到发射器是按订阅者使用的(不共享)。这显然是我的误解,因为文件在这方面很清楚。 有趣!随意回答您自己的问题,这可能会引起其他人的兴趣! 【参考方案1】:

我尝试适应的底层 API 存在限制(Twitter 流 API)。

目标是连接到 Twitter 一次并处理各种不同订阅者的推文流。

最初我认为传递给Flux.create 方法的发射器总是对所有订阅者使用相同的FluxSink。这当然没有意义。相反,FluxSink 是按订阅者提供的,正如 javadoc 明确指出的那样。

我使用 Twitter 侦听器实现了我的用例,该侦听器允许注册(和取消注册)多个 FluxSink 实例。这样,单个推文流就可以被各种不同的订阅者订阅。

Flux<Tweet> flux = Flux.<Tweet>create(twitterListener::addSink);

我的 twitterListener 实现了 spring-social-twitter 项目中的 org.springframework.social.twitter.api.StreamListener

【讨论】:

以上是关于Spring WebFlux 休息控制器仅服务于前两个订阅的主要内容,如果未能解决你的问题,请参考以下文章

Spring WebFlux WebClient 弹性和性能

Angular 6 到 Spring 引导休息服务 CORS 问题

Spring项目中APN的休息控制器(推送通知)

Spring WebFlux:从控制器提供文件

Spring @Async 与 Spring WebFlux

Spring Boot Webflux/Netty - 检测关闭的连接