如何在 Spring Boot 应用程序中关闭客户端 sse 连接

Posted

技术标签:

【中文标题】如何在 Spring Boot 应用程序中关闭客户端 sse 连接【英文标题】:How to close client sse connection in spring boot application 【发布时间】:2021-05-17 05:51:55 【问题描述】:

我有一个 Spring Boot 应用程序,我必须使用 SSE 连接到一些外部服务。 WebClient 建立连接,然后我使用 Flux 来读取响应。一切正常,但问题是连接保持打开状态,因为该过程并非旨在每次在该第 3 方服务中达到终点。我想作为客户端手动关闭连接,因为我知道这个连接应该何时完成。我该怎么做?

建立连接:

private Flux<ServerSentEvent<String>> connect(String accessToken) 
    TcpClient timeoutClient = createTimeoutClient();
    ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(HttpClient.from(timeoutClient));
    String url = npzServerBaseUrl+uniqueCodePath;
    WebClient client = WebClient
            .builder()
            .clientConnector(reactorClientHttpConnector)
            .defaultHeader(HttpHeaders.AUTHORIZATION, Naming.TOKEN_PREFIX + accessToken)
            .baseUrl(url)
            .build();

    ParameterizedTypeReference<ServerSentEvent<String>> type
            = new ParameterizedTypeReference<ServerSentEvent<String>>() ;
    return client.get()
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, clientResponse -> 
                String msg = "Error from server: "+clientResponse.statusCode().toString();
                        //invalidate access token
                        if (clientResponse.statusCode().value()==401) 
                            //remove invalid token and connect again
                            loginContext.invalidToken(accessToken);
                            return Mono.error(new InvalidNpzToken(msg));
                        
                        return Mono.error(new IllegalStateException(msg));
                    
            )
            .onStatus(HttpStatus::is5xxServerError, clientResponse ->
                    Mono.error(new IllegalStateException("Error from server: "+clientResponse.statusCode().toString()))
            )
            .bodyToFlux(type);


private TcpClient createTimeoutClient() 
    return TcpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, SECONDS*1000)
            .option(EpollChannelOption.TCP_USER_TIMEOUT, SECONDS*1000)
            .doOnConnected(
                    c -> c.addHandlerLast(new ReadTimeoutHandler(SECONDS))
                            .addHandlerLast(new WriteTimeoutHandler(SECONDS)));

处理内容:

Flux<ServerSentEvent<String>> eventStream = connect(accessToken);

    eventStream.subscribe(
            content -> 
                log.info("Time:  - event: name[], id [], content[] ",
                    LocalTime.now(), content.event(), content.id(), content.data());
                if ("uuid".equals(content.event().trim())) 
                    listener.receivedUniqueCode(content.data().trim());
                 else if ("code".equals(content.event().trim())) 
                    listener.receivedCode(content.data().trim());
                
            ,
            (Throwable error) -> 
                if (error instanceof InvalidToken) 
                    log.error("Error receiving SSE", error);
                    //let's retry connection as token has expired
                    getCode(request, listener);
                
            ,
            () -> log.info("Connection closed!"));

我期望的是我可以调用 connection.close() 或类似的东西并且连接将被关闭。

谢谢

如果需要,请提供更多信息。

进口:

import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

春季开机:

<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>

【问题讨论】:

【参考方案1】:

eventStream.subscribe() 返回一个reactor.core.Disposable

您可以在上面调用dispose()取消订阅和底层资源。

【讨论】:

谢谢。我实际上是在阅读这种方法的文档,但不是一直到最后一行:)

以上是关于如何在 Spring Boot 应用程序中关闭客户端 sse 连接的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Boot 中关闭嵌入式 Tomcat?

如何在我的新 Spring Boot 项目中关闭 Spring Security?

如何配置在 Spring + JDBC 中关闭自动提交?

如何在 Spring Security 中关闭 HTTP 会话超时?

使用simplejdbccall时如何在spring中关闭refcursor

极速开发之Spring Boot五种热部署方式