Spring boot 反应式和 EventSource 的工作示例

Posted

技术标签:

【中文标题】Spring boot 反应式和 EventSource 的工作示例【英文标题】:Working example of Spring boot reactive and EventSource 【发布时间】:2021-02-14 01:03:42 【问题描述】:

我正在尝试使用响应式 mongodb 和 EventSource 进行工作的 Spring Boot。 但是,我面临重复重新打开连接的问题,因为它已被服务器关闭。我什至怀疑这是否真的有效,因为我没有找到任何具有反应式数据库和事件源的工作示例......

能否请您指出一个工作示例或告诉我我的代码有什么问题?

这里是代码的主要部分:

pom.xml

<properties>
  <java.version>1.8</java.version>
  <junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.5.RELEASE</version>
</parent>

<dependencies>

<!-- webflux reactive -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- thymeleaf -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

<!-- exclude junit 4, prefer junit 5 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
  <exclusions>
    <exclusion>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<!-- junit 5 -->
<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter-engine</artifactId>
  <version>$junit-jupiter.version</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-devtools</artifactId>
  <optional>true</optional>
</dependency>

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

</dependencies>

正如您在 pom 中看到的,我使用的是嵌入式 tomcat(我已经尝试过使用 Netty,默认的 Spring Boot 服务器......)。 此外,我正在将应用程序部署到任何远程服务器,但只是在我的本地服务器上尝试(Windows 10)。

网页:

    let source = new EventSource("/comment/stream");

    source.addEventListener("message", function (event) 
        // These events are JSON, so parsing and DOM fiddling are needed
        var comment = JSON.parse(event.data);
        console.log(comment ); 
    );

    source.addEventListener("error", function (event) 
      console.log("error", event);
      this.close();
    );

休息控制器:

@RestController
public class CommentController 

  @Autowired
  private CommentRepository commentRepository;

  @PostMapping(path = "/comment")
  public Mono<Comment> comment(@RequestBody Comment comment) 
    return this.commentRepository.save(comment);
  

  @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<Comment> feed() 
    return this.commentRepository.findAll();
  


数据库存储库:

@Repository
public interface CommentRepository extends ReactiveSortingRepository<Comment, String> 

 Flux<Comment> findAll();

同样,使用 EventSource 的 Web 客户端每秒都在重新连接,因为连接已被服务器关闭。

谢谢!

【问题讨论】:

您的代码中没有任何内容可以解释原因,我们需要更多信息、更多客户端代码、更多日志。 【参考方案1】:

我不太确定,您提供的信息太少,无法说明您的连接关闭的原因。没有日志,您也没有透露任何有关其部署位置的信息。

我只会根据个人经验回答这个问题。我在 heroku 上部署了一个使用事件流的应用程序,它们在每个应用程序前面都有一个代理/负载均衡器,它将终止任何在 60 秒后不发送任何内容的连接。

正如这里提到的Why are event sources closed after 30-60 sec 它证实了我一直注意到的。

要解决此问题,您可以使用 websockets 实现 ping/pong 消息,或者如果像我一样使用 ServerSentEvents,我实现了保持活动消息。

    .GET("", accept(TEXT_EVENT_STREAM), request -> ok()
        .contentType(TEXT_EVENT_STREAM)
        .header("Cache-Control", "no-transform")
        .body(Flux.merge(myHandler.getEvents()), 
                  Flux.interval(Duration.ofSeconds(15))
                          .map(aLong -> ServerSentEvent.builder()
                                                       .comment("keep alive")
                                                       .build())),
        new ParameterizedTypeReference<List<MyClass>>() ))

我从我的一个项目中获取了这段代码 sn-p。在这里你可以看到我与我当前的流合并了一个通量,在给定的时间间隔(15秒)将发出一个ServerSentEvent,只有一个保持活动的评论。由于它是一条评论,它会被客户忽略。

需要说明的是,常规流 myHandler.getEvents 返回包裹在 ServerSentEvents 中的数据。

【讨论】:

以上是关于Spring boot 反应式和 EventSource 的工作示例的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 反应式 webflux

带有反应式 mongodb 的 Spring Boot 不断尝试连接到 localhost

Spring Boot 反应式 Web 服务

Spring Boot Webflux 反应式 API

如何从 nginx 发送请求并将容器反应到 Spring Boot 容器?

使用 @EnableResourceServer 支持 Spring Boot 反应式 (webflux)