Spring Boot Webflux/Netty - 检测关闭的连接

Posted

技术标签:

【中文标题】Spring Boot Webflux/Netty - 检测关闭的连接【英文标题】:Spring Boot Webflux/Netty - Detect closed connection 【发布时间】:2018-07-26 03:55:13 【问题描述】:

我一直在使用 webflux 启动器 (spring-boot-starter-webflux) 使用 spring-boot 2.0.0.RC1。我创建了一个返回无限通量的简单控制器。我希望发布者只有在有客户(订阅者)的情况下才可以工作。假设我有一个像这样的控制器:

@RestController
public class Demo 

    @GetMapping(value = "/")
    public Flux<String> getEvents()
        return Flux.create((FluxSink<String> sink) -> 

            while(!sink.isCancelled())

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            
            sink.complete();
        ).doFinally(signal -> System.out.println("END"));
    


现在,当我尝试运行该代码并使用 Chrome 访问端点 http://localhost:8080/ 时,我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 如何在关闭浏览器后立即终止/取消流式传输?

从这个answer我引用:

目前使用 HTTP,确切的背压信息不是 通过网络传输,因为 HTTP 协议不支持 这。如果我们使用不同的有线协议,这可能会改变。

我假设,由于 HTTP 协议不支持背压,这意味着也不会发出取消请求。

进一步调查,通过分析网络流量,显示只要我关闭浏览器,浏览器就会发送 TCP FIN。有没有办法配置 Netty(或其他东西),以便半关闭的连接会在发布者上触发取消事件,从而使 while 循环停止?

或者我是否必须编写类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter 的自己的适配器,并在其中实现自己的订阅者?

感谢您的帮助。

编辑: 如果没有客户端,将在尝试将数据写入套接字时引发IOException。正如您在stack trace 中看到的那样。

但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此检测消失的客户端需要相同的时间。正如Brian Clozel's answer 中所指出的,这是 Reactor Netty 中的一个已知问题。我尝试通过将依赖项添加到POM.xml 来改用Tomcat。像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

虽然它取代了 Netty 并改用 Tomcat,但由于浏览器不显示任何数据,它似乎没有反应。但是,控制台中没有警告/信息/异常。 spring-boot-starter-webflux 这个版本 (2.0.0.RC1) 是否应该与 Tomcat 一起工作?

【问题讨论】:

【参考方案1】:

由于这是一个已知问题(请参阅 Brian Clozel's answer),我最终使用了一个 Flux 来获取我的真实数据并使用另一个以实现某种 ping/heartbeat 机制。结果,我将两者与Flux.merge() 合并在一起。

在这里您可以看到我的解决方案的简化版本:

@RestController
public class Demo 

    public interface Notification

    public static class MyData implements Notification
        …
        public boolean isEmpty()…
    

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() 
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() 
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() 
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> 

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                )
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    

我将所有内容都包装为ServerSentEvent&lt;? extends Notification&gt;Notification 只是一个标记界面。我使用ServerSentEvent 类中的event 字段来区分数据和ping 事件。由于心跳Flux 会在很短的时间间隔内不断地发送事件,因此检测到客户端离开所花费的时间最多是该时间间隔的长度。请记住,我需要它,因为在我获得一些可以发送的真实数据之前可能需要一段时间,因此,它也可能需要一段时间才能检测到客户端已经消失。像这样,一旦无法发送 ping(或者可能是消息事件),它就会检测到客户端已经消失。

关于标记界面的最后一个注释,我称之为通知。这并不是真正必要的,但它提供了一些类型安全性。没有它,我们可以写 Flux&lt;ServerSentEvent&lt;?&gt;&gt; 而不是 Flux&lt;ServerSentEvent&lt;? extends Notification&gt;&gt; 作为 getNotificationStream() 方法的返回类型。或者也可以让 getHeartbeatStream() 返回Flux&lt;ServerSentEvent&lt;MyData&gt;&gt;。然而,像这样它会允许任何对象可以被发送,这是我不想要的。因此,我添加了接口。

【讨论】:

我不太明白它是如何工作的。一旦客户端断开连接,带有ping的通量会因为异常而被破坏? 我已经通过在客户端断开 wifi 进行了测试,但它不起作用。但是,当我通过关闭浏览器或浏览器选项卡进行测试时,它正在工作。那么,断开客户端wifi或互联网时如何使其工作?【参考方案2】:

我不确定为什么会这样,但我怀疑这是因为选择了生成运算符。我认为使用以下方法会起作用:

    return Flux.interval(Duration.ofMillis(500))
    .map(input -> 
        return "DATA";
    );

根据to Reactor's reference documentation, you're probably hitting the key difference between generate and push(我相信使用 generate 的非常相似的方法可能也会起作用)。

我的评论是指背压信息(Subscriber 愿意接受多少元素),但成功/错误信息是通过网络传达的。

根据您选择的 Web 服务器(Reactor Netty、Tomcat、Jetty 等),关闭客户端连接可能会导致:

服务器端收到取消信号(我认为这是 Netty 支持的) 服务器在尝试写入已关闭的连接时收到错误信号(我相信 Servlet 规范没有提供该回调,我们缺少取消信息)。

简而言之:你不需要做任何特别的事情,它应该已经得到支持,但你的Flux 实现可能是这里的实际问题。

更新: this is a known issue in Reactor Netty

【讨论】:

感谢您的回答。事实上,当我像您建议的那样实现Flux 时,它会改善这种情况,这意味着它会在尝试发送下一个数据时检测到浏览器已关闭。但是,在我的用例中,可能需要一段时间才能生成新数据。因此,它也可能需要一段时间才能检测到客户端已消失。作为一种解决方法,我试过这个:return Flux.interval(Duration.ofMillis(500)) .map(input -&gt; return "DATA || null"; ).filter(x -&gt; x != null); 但是当我过滤那些空值时,它不会检测到关闭的连接,直到它尝试发送真实数据(例如非空值)。

以上是关于Spring Boot Webflux/Netty - 检测关闭的连接的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 使用 Webflux、Netty 和 HTTP2,导致证书无效,可能配置错误

Spring Webflux Netty http 和 https

spring webflux(netty)处理程序无法解析包含大于 750 字节的 json 的 ServerRequest

Webflux + Netty NIO 性能相比传统 IO 下降约 30 倍

Spring Boot 学习例子

Spring Boot 2Spring Boot CLI