Spring boot 反应式和 EventSource 的工作示例
Posted
技术标签:
【中文标题】Spring boot 反应式和 EventSource 的工作示例【英文标题】:Working example of Spring boot reactive and EventSource 【发布时间】:2021-02-14 01:03:42 【问题描述】:我正在尝试使用响应式 mongodb 和 EventSource 进行工作的 Spring Boot。 但是,我面临重复重新打开连接的问题,因为它已被服务器关闭。我什至怀疑这是否真的有效,因为我没有找到任何具有反应式数据库和事件源的工作示例......
能否请您指出一个工作示例或告诉我我的代码有什么问题?
这里是代码的主要部分:
pom.xml
<properties>
<java.version>1.8</java.version>
<junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
</parent>
<dependencies>
<!-- webflux reactive -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<!-- exclude junit 4, prefer junit 5 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- junit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>$junit-jupiter.version</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
正如您在 pom 中看到的,我使用的是嵌入式 tomcat(我已经尝试过使用 Netty,默认的 Spring Boot 服务器......)。 此外,我正在将应用程序部署到任何远程服务器,但只是在我的本地服务器上尝试(Windows 10)。
网页:
let source = new EventSource("/comment/stream");
source.addEventListener("message", function (event)
// These events are JSON, so parsing and DOM fiddling are needed
var comment = JSON.parse(event.data);
console.log(comment );
);
source.addEventListener("error", function (event)
console.log("error", event);
this.close();
);
休息控制器:
@RestController
public class CommentController
@Autowired
private CommentRepository commentRepository;
@PostMapping(path = "/comment")
public Mono<Comment> comment(@RequestBody Comment comment)
return this.commentRepository.save(comment);
@GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> feed()
return this.commentRepository.findAll();
数据库存储库:
@Repository
public interface CommentRepository extends ReactiveSortingRepository<Comment, String>
Flux<Comment> findAll();
同样,使用 EventSource 的 Web 客户端每秒都在重新连接,因为连接已被服务器关闭。
谢谢!
【问题讨论】:
您的代码中没有任何内容可以解释原因,我们需要更多信息、更多客户端代码、更多日志。 【参考方案1】:我不太确定,您提供的信息太少,无法说明您的连接关闭的原因。没有日志,您也没有透露任何有关其部署位置的信息。
我只会根据个人经验回答这个问题。我在 heroku 上部署了一个使用事件流的应用程序,它们在每个应用程序前面都有一个代理/负载均衡器,它将终止任何在 60 秒后不发送任何内容的连接。
正如这里提到的Why are event sources closed after 30-60 sec 它证实了我一直注意到的。
要解决此问题,您可以使用 websockets 实现 ping/pong 消息,或者如果像我一样使用 ServerSentEvents
,我实现了保持活动消息。
.GET("", accept(TEXT_EVENT_STREAM), request -> ok()
.contentType(TEXT_EVENT_STREAM)
.header("Cache-Control", "no-transform")
.body(Flux.merge(myHandler.getEvents()),
Flux.interval(Duration.ofSeconds(15))
.map(aLong -> ServerSentEvent.builder()
.comment("keep alive")
.build())),
new ParameterizedTypeReference<List<MyClass>>() ))
我从我的一个项目中获取了这段代码 sn-p。在这里你可以看到我与我当前的流合并了一个通量,在给定的时间间隔(15秒)将发出一个ServerSentEvent
,只有一个保持活动的评论。由于它是一条评论,它会被客户忽略。
需要说明的是,常规流 myHandler.getEvents
返回包裹在 ServerSentEvent
s 中的数据。
【讨论】:
以上是关于Spring boot 反应式和 EventSource 的工作示例的主要内容,如果未能解决你的问题,请参考以下文章
带有反应式 mongodb 的 Spring Boot 不断尝试连接到 localhost