如何在 Spring Boot 客户端应用程序中创建不共享底层网络连接的套接字

Posted

技术标签:

【中文标题】如何在 Spring Boot 客户端应用程序中创建不共享底层网络连接的套接字【英文标题】:How do I create rSockets that don't share an underlying network connection in springboot client app 【发布时间】:2021-07-22 07:26:38 【问题描述】:

我想创建从 springboot 客户端应用程序到单个服务器的多个 rsocket 连接,而不共享相同的底层网络连接。

这个用例是负载测试 - 我想模拟许多 Web 客户端与我的服务器建立 rSocket 连接,并希望从单个应用程序实例创建(比如说)100 个这样的连接,因此它比操作和监控更容易运行 100 个独立的进程。

我正在这样做:

RSocketRequester requester = rSocketRequesterBuilder
   .dataMimeType(MediaType.APPLICATION_JSON)
   .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
   .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
   .websocket(serverUri);
requester.route("my.endpoint").retrieveFlux(byte[].class);

这可以很好地与服务器建立多个连接并获取数据。但是websocket(),根据其文档,似乎在重复使用相同的底层共享连接,这意味着它不能充分模拟许多独立连接,从而在服务器上增加负载。

知道怎么做吗?

我使用的是 Springboot 2.4.4。

更新

我现在认为我成功地建立了两个连接,但是我的第二个连接请求以某种方式继承并附加到第一个连接的元数据,而不是拥有自己的单独元数据。我关注的元数据是 JWT。

这是来自服务器端的日志记录。在我看来,第二个设置框架有两个 JWT,每个都以 message/x.rsocket.authentication.bearer.v0 开头;第一个 JWT 之后是第二个 JWT。

2021-05-03 17:52:20.501 DEBUG 19737 --- [ctor-http-nio-2] reactor.netty.transport.ServerTransport  : [id:98c2fa05, L:/0:0:0:0:0:0:0:0:7080] Bound new server
2021-05-03 17:52:20.506  INFO 19737 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7080
2021-05-03 17:52:20.520  INFO 19737 --- [           main] c.p.pano2.rx.RxServicesApplication       : Started RxServicesApplication in 2.616 seconds (JVM running for 3.152)
2021-05-03 17:52:25.799 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] New http connection, requesting read
2021-05-03 17:52:25.800 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.transport.TransportConfig  : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Initialized pipeline DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:25.826 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Increasing pending responses, now 1
2021-05-03 17:52:25.831 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:25.832 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda$1058/0x0000000800720c40@cb577eb
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:25.870 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] FluxReceivepending=0, cancelled=false, inboundDone=false, inboundError=null: subscribing inbound receiver
2021-05-03 17:52:25.893 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 300
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77       |LoG62szsnGXTyw  |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.929  INFO 19737 --- [ctor-http-nio-3] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1040895620 subject USER1
2021-05-03 17:52:25.947 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.965  INFO 19737 --- [ctor-http-nio-3] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:28.981 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] New http connection, requesting read
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.transport.TransportConfig  : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Initialized pipeline DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Increasing pending responses, now 1
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.http.server.HttpServer     : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda$1058/0x0000000800720c40@cb577eb
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)
2021-05-03 17:52:30.752 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.channel.FluxReceive        : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] FluxReceivepending=0, cancelled=false, inboundDone=false, inboundError=null: subscribing inbound receiver
2021-05-03 17:52:30.755 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 522
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77 29 6d |LoG62szsnGXTyw)m|
|000000e0| 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b 65 74 |essage/x.rsocket|
|000000f0| 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f 6e 2e |.authentication.|
|00000100| 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 4a 68 |bearer.v0...eyJh|
|00000110| 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 4a 39 |bGciOiJIUzI1NiJ9|
|00000120| 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 30 56 |.eyJqdGkiOiJVU0V|
|00000130| 53 4d 69 49 73 49 6d 6c 68 64 43 49 36 4d 54 59 |SMiIsImlhdCI6MTY|
|00000140| 79 4d 44 41 33 4f 44 63 31 4d 43 77 69 63 33 56 |yMDA3ODc1MCwic3V|
|00000150| 69 49 6a 6f 69 56 56 4e 46 55 6a 49 69 4c 43 4a |iIjoiVVNFUjIiLCJ|
|00000160| 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 47 6c |pc3MiOiJmYWtlIGl|
|00000170| 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 43 49 |zc3VlciIsImV4cCI|
|00000180| 36 4d 54 59 79 4d 44 45 32 4e 54 45 31 4d 48 30 |6MTYyMDE2NTE1MH0|
|00000190| 2e 4a 33 63 63 52 68 72 31 43 4b 70 45 43 58 66 |.J3ccRhr1CKpECXf|
|000001a0| 45 51 53 31 5a 49 63 71 4f 6d 76 57 31 52 52 4d |EQS1ZIcqOmvW1RRM|
|000001b0| 43 38 67 53 76 4a 76 45 54 44 64 67             |C8gSvJvETDdg    |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.757  INFO 19737 --- [ctor-http-nio-4] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1994845380 subject USER1
2021-05-03 17:52:30.757 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.758  INFO 19737 --- [ctor-http-nio-4] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:31.967 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 31          |Hello USER1 1   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:33.765 DEBUG 19737 --- [     parallel-4] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+

我无法从客户端连接我从窃听日志中看到的内容;帧记录器中的编码似乎有所不同,可读性较差,但看起来第二个请求比第一个请求长。 (省略日志以保持在 30k 字符限制内)

我用来生成两个客户端请求的代码是:

  private void testRequest(String user) 
        String jwt = FakeJwtUtils.createJwt(user, user, 24*60*60*1000L);
        URI uri = config.getUri();
        String baseUrl = uri.getHost() + ":" + uri.getPort();
        HttpClient httpClient = HttpClient.newConnection().baseUrl(baseUrl).wiretap(true);  // newConnection() to avoid shared connection
        WebsocketClientTransport transport = WebsocketClientTransport.create(httpClient, uri.getPath());
        try 
            log.info("Setting up rSocketRequesterBuilder for jwt sub : ", FakeJwtUtils.parseClaimsAsMap(jwt).get("sub"), jwt);
         catch (Exception ignore)  
        var requesterBuilder = rSocketRequesterBuilder
                .dataMimeType(MediaType.APPLICATION_JSON)
                .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
                .transport(transport);
        requesterBuilder.route("test").retrieveFlux(byte[].class).subscribe(bytes -> log.info("rcvd: ", new String(bytes)));
    

任何指针将不胜感激!

【问题讨论】:

没有一个具体的例子很难,可能你只需要将 Spring bean 定义更改为原型范围而不是单例。你能扩展你的例子吗? 感谢您的回复。我添加了更多细节——尽管实际上我现在认为我成功地建立了 2 个套接字连接,但是在启动第二个连接时以某种方式将元数据加倍。将更新上面的帖子。 【参考方案1】:

我正在回答我自己的问题,以防其他搜索者和我有类似的困惑。

根本问题是我重复使用RSocketRequester.Builder 为我的多个请求创建多个请求者。这导致了多个问题;最大的问题是我有效地(在上面的代码中)在同一个构建器上多次调用setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE),其效果是附加 JWT 条目而不是像我预期的那样设置或替换。

解决方案似乎只是为每个连接创建一个新的构建器:

RSocketRequester.builder()
     .dataMimeType(MediaType.APPLICATION_JSON)
     .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
     .rsocketStrategies(b -> b.encoder(new BearerTokenAuthenticationEncoder()))
     .websocket(config.getUri())
     .route("test").retrieveFlux(byte[].class).subscribe();

【讨论】:

以上是关于如何在 Spring Boot 客户端应用程序中创建不共享底层网络连接的套接字的主要内容,如果未能解决你的问题,请参考以下文章

我如何在 Spring Boot/MVC 中创建错误处理程序(404、500...)

如何在 Spring Boot 应用程序中创建第二个 RedisTemplate 实例

如何在 Spring Boot 中创建不同的 ThreadPoolTask​​Executor? [复制]

如何在 Spring Boot 中创建自定义错误页面

如何在 Spring Boot 中创建一个调用包含构造函数注入的服务的测试?

如何在spring boot webflux中创建动态过滤器?