Reactor Netty websocket 通道过早关闭

Posted

技术标签:

【中文标题】Reactor Netty websocket 通道过早关闭【英文标题】:Reactor Netty websocket channel closed prematurely 【发布时间】:2021-08-15 15:05:42 【问题描述】:

我有一个长期运行的 websocket 客户端,在 java Spring reactor 中实现,Netty (spring-boot-starter-parent 2.5.3) 针对 Binance ws api。 根据规范,weboscket 通道保持 24 小时开放。

websocket 在大约 3 分钟后意外提前关闭:

16:50:48.418 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:50:48.434 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:50:48.436 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:50:48.437 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 14
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:50:48.439 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: unavailable: Reflective setAccessible(true) disabled
16:50:48.439 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable: class io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @1efbd816
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): unavailable
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - maxDirectMemory: 8388608000 bytes (maybe)
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /tmp (java.io.tmpdir)
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:50:48.449 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: -1 bytes
16:50:48.450 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:50:48.450 [main] DEBUG io.netty.util.internal.CleanerJava9 - java.nio.ByteBuffer.cleaner(): available
16:50:48.450 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:50:48.460 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources prefix=reactor-http, daemon=true, selectCount=8, workerCount=8
16:50:48.460 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@192b07fd
16:50:48.485 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:50:48.486 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:50:48.581 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:50:48.581 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:50:48.582 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (lo, 0:0:0:0:0:0:0:1%lo)
16:50:48.583 [main] DEBUG io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128
16:50:48.590 [main] DEBUG org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient - Connecting to wss://stream.binance.com:9443/ws
16:50:48.601 [main] DEBUG io.netty.handler.ssl.OpenSsl - netty-tcnative not in the classpath; OpenSslEngine will be unavailable.
16:50:48.712 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default protocols (JDK): [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1] 
16:50:48.712 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
16:50:48.720 [main] DEBUG reactor.netty.resources.DefaultLoopIOUring - Default io_uring support : false
16:50:48.724 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.workdir: /tmp (io.netty.tmpdir)
16:50:48.725 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.deleteLibAfterLoading: true
16:50:48.725 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.tryPatchShadedId: true
16:50:48.730 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - Successfully loaded the library /tmp/libnetty_transport_native_epoll_x86_6410359104745093945181.so
16:50:48.731 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : true
16:50:48.734 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:50:48.742 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:50:48.743 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:50:48.749 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
16:50:48.768 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new [http] client pool [PoolFactoryevictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000] for [stream.binance.com/<unresolved>:9443]
16:50:48.798 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 27223 (auto-detected)
16:50:48.799 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 28:16:ad:ff:fe:2b:7c:b7 (auto-detected)
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:50:48.813 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:50:48.813 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:50:48.814 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:50:48.828 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126] Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
16:50:48.845 [reactor-http-epoll-2] DEBUG reactor.netty.tcp.SslProvider - [id:d962b126] SSL enabled using engine sun.security.ssl.SSLEngineImpl@55608030 and SNI stream.binance.com/<unresolved>:9443
16:50:48.852 [reactor-http-epoll-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
16:50:48.853 [reactor-http-epoll-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
16:50:48.853 [reactor-http-epoll-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3ba51dc6
16:50:48.854 [reactor-http-epoll-2] DEBUG reactor.netty.transport.TransportConfig - [id:d962b126] Initialized pipeline DefaultChannelPipeline(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
16:50:48.866 [reactor-http-epoll-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@1fb356c5
16:50:48.867 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsQueryContext - [id: 0xdd7103d7] WRITE: UDP, [11524: /127.0.0.53:53], DefaultDnsQuestion(stream.binance.com. IN A)
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.delayedQueue.ratio: 8
16:50:48.878 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsQueryContext - [id: 0xdd7103d7] WRITE: UDP, [33872: /127.0.0.53:53], DefaultDnsQuestion(stream.binance.com. IN AAAA)
16:50:48.904 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsNameResolver - [id: 0xdd7103d7] RECEIVED: UDP [11524: /127.0.0.53:53], DatagramDnsResponse(from: /127.0.0.53:53, 11524, QUERY(0), NoError(0), RD RA)
    DefaultDnsQuestion(stream.binance.com. IN A)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(OPT flags:0 udp:65494 0B)
16:50:48.907 [reactor-http-epoll-2] DEBUG reactor.netty.transport.TransportConnector - [id:d962b126] Connecting to [stream.binance.com/52.199.12.133:9443].
16:50:48.907 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsNameResolver - [id: 0xdd7103d7] RECEIVED: UDP [33872: /127.0.0.53:53], DatagramDnsResponse(from: /127.0.0.53:53, 33872, QUERY(0), NoError(0), RD RA)
    DefaultDnsQuestion(stream.binance.com. IN AAAA)
    DefaultDnsRawRecord(OPT flags:0 udp:65494 0B)
16:50:49.162 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Registering pool release on close event for channel
16:50:49.163 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
16:50:49.807 [reactor-http-epoll-2] DEBUG io.netty.handler.ssl.SslHandler - [id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] HANDSHAKEN: protocol:TLSv1.2 cipher suite:TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
16:50:49.808 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443], [connected])
16:50:49.826 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(GETuri=/, connection=PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443], [configured])
16:50:49.826 [reactor-http-epoll-2] DEBUG reactor.netty.http.client.HttpClientConnect - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Handler is being applied: uri=wss://stream.binance.com:9443/ws, method=GET
16:50:49.830 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(GETuri=/ws, connection=PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443], [request_prepared])
16:50:49.839 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Added encoder [reactor.left.httpAggregator] at the beginning of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.httpCodec, reactor.left.httpAggregator, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
16:50:49.839 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpAggregator = io.netty.handler.codec.http.HttpObjectAggregator), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
16:50:49.840 [reactor-http-epoll-2] DEBUG reactor.netty.http.client.HttpClientOperations - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Attempting to perform websocket handshake with wss://stream.binance.com:9443/ws
16:50:49.846 [reactor-http-epoll-2] DEBUG io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker13 - WebSocket version 13 client handshake key: 7FNVb427OHllyiM2Clg//g==, expected response: iTvQFIKtv7xyyXvmEAooh8NZPVw=
16:50:50.122 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(wsuri=/ws, connection=PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443], [response_received])
16:50:50.135 [reactor-http-epoll-2] DEBUG org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession - [36eb4d6b] Session id "36eb4d6b" for wss://stream.binance.com:9443/ws
16:50:50.135 [reactor-http-epoll-2] DEBUG org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient - Started session '36eb4d6b' for wss://stream.binance.com:9443/ws
16:50:50.147 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Added decoder [reactor.left.wsFrameAggregator] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.httpCodec, ws-decoder, ws-encoder, reactor.left.wsFrameAggregator, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
16:50:50.149 [reactor-http-epoll-2] DEBUG reactor.netty.channel.FluxReceive - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] FluxReceivepending=0, cancelled=false, inboundDone=false, inboundError=null: subscribing inbound receiver
16:50:50.150 [reactor-http-epoll-2] INFO TRACE - onSubscribe(FluxMap.MapSubscriber)
16:50:50.150 [reactor-http-epoll-2] INFO TRACE - request(256)
16:50:50.411 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:50:50.413 [reactor-http-epoll-2] INFO TRACE - request(1)
...
16:52:16.652 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:52:16.652 [reactor-http-epoll-2] INFO TRACE - request(1)
16:52:17.168 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] Channel closed, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
16:52:17.169 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] Non Removed handler: reactor.left.httpAggregator, context: null, pipeline: DefaultChannelPipeline(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (ws-decoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder), (ws-encoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder), (reactor.left.wsFrameAggregator = io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
A completed
A terminated
16:52:17.172 [reactor-http-epoll-2] INFO TRACE - onComplete()
B completed
B terminated
C success
C terminated
16:52:17.177 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] onStateChange(wsuri=/ws, connection=PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443], [response_completed])
16:52:17.177 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] onStateChange(wsuri=/ws, connection=PooledConnectionchannel=[id: 0xd962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443], [disconnecting])

我尝试使用另一种技术(如 javascript)重现该问题,但一切运行正常。 似乎频道已关闭,所以我尝试在 TcpClient 级别调整 ChannelOptions ......仍然没有运气!

TcpClient wsTcp = TcpClient.create();
wsTcp.option(ChannelOption.AUTO_CLOSE, Boolean.FALSE);
wsTcp.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.MAX_VALUE);
wsTcp.option(ChannelOption.AUTO_READ, Boolean.TRUE);
wsTcp.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
wsTcp.option(ChannelOption.SO_TIMEOUT, Integer.MAX_VALUE);

我提供了一个 java 示例代码来重现该问题:

package test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;

import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class WsTest 

    public static void main(String[] args) throws InterruptedException 
        CountDownLatch latch = new CountDownLatch(1);
        ReactorNettyWebSocketClient wsclient = new ReactorNettyWebSocketClient();
        wsclient.setMaxFramePayloadLength(Integer.MAX_VALUE);
        EmitterProcessor<String> output = EmitterProcessor.create();
        Mono<Void> execMono = wsclient.execute(URI.create("wss://stream.binance.com:9443/ws"),
                session -> session.send(Flux.just(session.textMessage("\"method\": \"SUBSCRIBE\",\"params\":[\"!ticker@arr\"],\"id\": 1")))
                        .thenMany(session
                                .receive()
                                .doOnCancel(() -> System.out.println("A cancelled"))
                                .doOnComplete(() -> System.out.println("A completed"))
                                .doOnTerminate(() -> System.out.println("A terminated"))
                                .map(x -> "evt")
                                .log("TRACE")
                                .subscribeWith(output).then())
                        .then());
        
        output.doOnCancel(() -> System.out.println("B cancelled"))
                .doOnComplete(() -> System.out.println("B completed"))
                .doOnTerminate(() -> System.out.println("B terminated"))
                .doOnSubscribe(s -> execMono
                        .doOnCancel(() -> System.out.println("C cancelled"))
                        .doOnSuccess(x -> System.out.println("C success"))
                        .doOnTerminate(() -> System.out.println("C terminated"))
                        .subscribe())
                .subscribe();

        latch.await();
    

我不明白为什么我会从 ReactorNettyWebSocketClient WebSocketHandler 获得完成/终止事件?

感谢您的帮助,

【问题讨论】:

【参考方案1】:

我终于找到了根本原因。

底层错误是 java websocket 1006 Unexpected Status of SSLEngineResult after an unwrap() operation

经过一番调查,我得到了返回的代码 1006,这意味着连接被客户端异常关闭,如 rfc https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 中所述

1006 是保留值,不得将其设置为状态码 通过端点关闭控制框。它被指定用于 应用程序需要一个状态代码来表明 连接被异常关闭,例如,没有发送或 接收关闭控制框架。

当时,我从WIFI连接切换到LAN连接,问题立即消失。 我的 WIFI 路由器无法正确处理巨大的负载。

【讨论】:

以上是关于Reactor Netty websocket 通道过早关闭的主要内容,如果未能解决你的问题,请参考以下文章

如何自定义websocket握手和/或添加过滤器?

面试官:Netty的线程模型可不是Reactor这么简单

websocket 高性能 实战

如何自定义 websocket 握手和/或添加过滤器?

6.netty线程模型-Reactor

Netty之Reactor线程模型概述