Spring Boot Webflux/Netty - 检测关闭的连接
Posted
技术标签:
【中文标题】Spring Boot Webflux/Netty - 检测关闭的连接【英文标题】:Spring Boot Webflux/Netty - Detect closed connection 【发布时间】:2018-07-26 03:55:13 【问题描述】:我一直在使用 webflux 启动器 (spring-boot-starter-webflux
) 使用 spring-boot 2.0.0.RC1
。我创建了一个返回无限通量的简单控制器。我希望发布者只有在有客户(订阅者)的情况下才可以工作。假设我有一个像这样的控制器:
@RestController
public class Demo
@GetMapping(value = "/")
public Flux<String> getEvents()
return Flux.create((FluxSink<String> sink) ->
while(!sink.isCancelled())
// TODO e.g. fetch data from somewhere
sink.next("DATA");
sink.complete();
).doFinally(signal -> System.out.println("END"));
现在,当我尝试运行该代码并使用 Chrome 访问端点 http://localhost:8080/ 时,我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 如何在关闭浏览器后立即终止/取消流式传输?
从这个answer我引用:
目前使用 HTTP,确切的背压信息不是 通过网络传输,因为 HTTP 协议不支持 这。如果我们使用不同的有线协议,这可能会改变。
我假设,由于 HTTP 协议不支持背压,这意味着也不会发出取消请求。
进一步调查,通过分析网络流量,显示只要我关闭浏览器,浏览器就会发送 TCP FIN
。有没有办法配置 Netty(或其他东西),以便半关闭的连接会在发布者上触发取消事件,从而使 while 循环停止?
或者我是否必须编写类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter
的自己的适配器,并在其中实现自己的订阅者?
感谢您的帮助。
编辑:
如果没有客户端,将在尝试将数据写入套接字时引发IOException
。正如您在stack trace 中看到的那样。
但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此检测消失的客户端需要相同的时间。正如Brian Clozel's answer 中所指出的,这是 Reactor Netty 中的一个已知问题。我尝试通过将依赖项添加到POM.xml
来改用Tomcat。像这样:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
虽然它取代了 Netty 并改用 Tomcat,但由于浏览器不显示任何数据,它似乎没有反应。但是,控制台中没有警告/信息/异常。 spring-boot-starter-webflux
这个版本 (2.0.0.RC1
) 是否应该与 Tomcat 一起工作?
【问题讨论】:
【参考方案1】:由于这是一个已知问题(请参阅 Brian Clozel's answer),我最终使用了一个 Flux
来获取我的真实数据并使用另一个以实现某种 ping/heartbeat 机制。结果,我将两者与Flux.merge()
合并在一起。
在这里您可以看到我的解决方案的简化版本:
@RestController
public class Demo
public interface Notification
public static class MyData implements Notification
…
public boolean isEmpty()…
@GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<? extends Notification>> getNotificationStream()
return Flux.merge(getEventMessageStream(), getHeartbeatStream());
private Flux<ServerSentEvent<Notification>> getHeartbeatStream()
return Flux.interval(Duration.ofSeconds(2))
.map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
.doFinally(signalType ->System.out.println("END"));
private Flux<ServerSentEvent<MyData>> getEventMessageStream()
return Flux.interval(Duration.ofSeconds(30))
.map(i ->
// TODO e.g. fetch data from somewhere,
// if there is no data return an empty object
return data;
)
.filter(data -> !data.isEmpty())
.map(data -> ServerSentEvent
.builder(data)
.event("message").build());
我将所有内容都包装为ServerSentEvent<? extends Notification>
。 Notification
只是一个标记界面。我使用ServerSentEvent
类中的event
字段来区分数据和ping 事件。由于心跳Flux
会在很短的时间间隔内不断地发送事件,因此检测到客户端离开所花费的时间最多是该时间间隔的长度。请记住,我需要它,因为在我获得一些可以发送的真实数据之前可能需要一段时间,因此,它也可能需要一段时间才能检测到客户端已经消失。像这样,一旦无法发送 ping(或者可能是消息事件),它就会检测到客户端已经消失。
关于标记界面的最后一个注释,我称之为通知。这并不是真正必要的,但它提供了一些类型安全性。没有它,我们可以写 Flux<ServerSentEvent<?>>
而不是 Flux<ServerSentEvent<? extends Notification>>
作为 getNotificationStream() 方法的返回类型。或者也可以让 getHeartbeatStream() 返回Flux<ServerSentEvent<MyData>>
。然而,像这样它会允许任何对象可以被发送,这是我不想要的。因此,我添加了接口。
【讨论】:
我不太明白它是如何工作的。一旦客户端断开连接,带有ping的通量会因为异常而被破坏? 我已经通过在客户端断开 wifi 进行了测试,但它不起作用。但是,当我通过关闭浏览器或浏览器选项卡进行测试时,它正在工作。那么,断开客户端wifi或互联网时如何使其工作?【参考方案2】:我不确定为什么会这样,但我怀疑这是因为选择了生成运算符。我认为使用以下方法会起作用:
return Flux.interval(Duration.ofMillis(500))
.map(input ->
return "DATA";
);
根据to Reactor's reference documentation, you're probably hitting the key difference between generate
and push
(我相信使用 generate 的非常相似的方法可能也会起作用)。
我的评论是指背压信息(Subscriber
愿意接受多少元素),但成功/错误信息是通过网络传达的。
根据您选择的 Web 服务器(Reactor Netty、Tomcat、Jetty 等),关闭客户端连接可能会导致:
服务器端收到取消信号(我认为这是 Netty 支持的) 服务器在尝试写入已关闭的连接时收到错误信号(我相信 Servlet 规范没有提供该回调,我们缺少取消信息)。简而言之:你不需要做任何特别的事情,它应该已经得到支持,但你的Flux
实现可能是这里的实际问题。
更新: this is a known issue in Reactor Netty
【讨论】:
感谢您的回答。事实上,当我像您建议的那样实现Flux
时,它会改善这种情况,这意味着它会在尝试发送下一个数据时检测到浏览器已关闭。但是,在我的用例中,可能需要一段时间才能生成新数据。因此,它也可能需要一段时间才能检测到客户端已消失。作为一种解决方法,我试过这个:return Flux.interval(Duration.ofMillis(500)) .map(input -> return "DATA || null"; ).filter(x -> x != null);
但是当我过滤那些空值时,它不会检测到关闭的连接,直到它尝试发送真实数据(例如非空值)。以上是关于Spring Boot Webflux/Netty - 检测关闭的连接的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot 2 使用 Webflux、Netty 和 HTTP2,导致证书无效,可能配置错误
Spring Webflux Netty http 和 https
spring webflux(netty)处理程序无法解析包含大于 750 字节的 json 的 ServerRequest