RSocket思潮
Posted 工匠人生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RSocket思潮相关的知识,希望对你有一定的参考价值。
每当有新技术涌现时,总会有些弄潮儿粉墨登场,为了KPI抑或是晋升,唾沫横飞得布道这些新技术。
当然,软件咨询师、布道师,和程序员一样,也是一种职业,但是 PUA 布道师挺无聊的 :)
今天,让我们一起不那么功利地一起了解一下RSocket到底是什么?——只是为了知识,不为了所谓的钱和利益。
概念
如上图所示,RSocket的官方网站是:http://rsocket.io/ ,官方解释是RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。它支持会话恢复,允许跨不同的传输连接恢复长期存在的流。RSocket适用于网络连接频繁掉线、切换和重新连接的场景,因此对移动服务器通信特别有用。
需要强调的是,RSocket是一种应用层协议,不是一个传输层的协议。它可以支持传输层的协议和技术,比如TCP和Proto Buf之类,它的重点是将反应流的实现(如TCP滑动窗口等)提升到应用层来。
RSocket是面向连接的、消息驱动的协议,内置了应用程序级的流控制。它在浏览器中和在服务器上一样工作。Web 浏览器可以服务于后端微服务的流量。它也是二进制的,它可以同样好地处理文本和二进制数据,并且可以分解有效工作负载。它将应用程序中的所有交互建模为网络原语。这意味着,你可以流化数据或执行发布 / 订阅,而无需设置应用程序队列。
它是一种支持多语言的基于Reactive编程模型的API规范,基于自定义的二进制协议之上,使用方可以直接根据接口编写代码而无需关心协议的细节。在Java领域,RSocket主要是基于Java SDK的Reactor和RxJava,以及Spring的Netty-Reactor进行Reactive编程模型的设计。
RSocket自定义的二进制协议性能高,号称是HTTP的十倍多,当然实现高性能还必须根据业务场景去适配优化。
REST和RSocket之我见
Netflix 的联合创始人兼CEO,Robert Roeser 发表过观点,认为RSocket是REST的挑战者。
https://www.infoq.com/articles/give-rest-a-rest-rsocket
RSocket的确是REST的挑战者,但是我不认为它在国内能迅速普及甚至直接淘汰掉REST。
不可否认,REST是基于HTTP实现的,他是“人类可读”的一种方便调试的方式。REST在国内而言,基于Swagger,非常方便前后端联调。国内的业务项目(不是纯技术项目)排期其实很大程度是依赖于前端团队的,曾几何时,常常听到前端团队的主管(纯管理)在会议上因为资源排期的问题,大声骂后端 “别TMD说前端做几个页面能有多少人日,有本事你做个页面吗?”。
拓展阅读:
简单二进制编码SBE https://github.com/real-logic/simple-binary-encoding
FlatBuffers https://google.github.io/flatbuffers
比使用Json快得多得多。
就算你的二进制程序将二进制转换为文本,通过网络将它转移给另一台机器,解析大量JSON和字符串导致了缓存未命中;甚至分发二进制数据对数据进行Base64编码,导致同样的数据序列化两次。你还会觉得REST过于可笑吗?
就算平均每秒 10 个请求的服务,它的 JSON 大小为 1KB,即相当于每天 860MB 的数据,或者每天 250 本《战争与和平》,没有人能读懂,就算浪费钱浪费资源,在国内,你能让前端团队放弃使用JSON吗?
至于输赢如何,就像某个公司老板认为 k8s 太冒险,只允许前进到Docker,个人认为RSocket在国内形势不太看好。
RSocket适用场景
RSocket还是可以提供二进制序列化、双向通信、多路复用、分割请求、元数据交换、流控制等能力。所以,在这些场景下,RSocket可以有很好的应用。
Dubbo支持响应式编程
RSocket对于Dubbo这类的RPC框架以及全链路追踪会产生互相的影响及变革。
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket
参照如上代码分支,用户可以在请求参数和返回值里使用 Mono 和 Flux 类型的对象,配置方式除了协议名使用RSocket之外,没有太多区别的地方。
这只是Dubbo的RSocket的第一步,分布式调用链/故障注入的未来,也将不限于RPC。
之前链路追踪的文章可以参考一下哈
Envoy、Istio、SideCar的补充?
微服务的发展经历了SpringCloud、ServiceMesh,未来该何去何从?未来是中央Mixer还是轻量级的Agent?
RSocket对于Envoy、Istio、SideCar等等也会发生一些化学作用。
Envoy是一个基于HTTP2/1.1的Proxy,不支持多路复用、非阻塞和完美的长连接。Envoy依靠control plane来实现weave mesh,比如新版本Istio Proxy的连接池就占用了很多的性能。Spring Framework 5.2 以后据说即将把RSocket作为缺省的反应通讯协议,在多路复用、非阻塞、长连接等方面会让用户更多的接触到。
Istio虽然号称不依赖Kubernets,但是在Kubernets外部署和管理sidecar proxy还是需要的。RSocket Broker相对比较容易。
众所周知,Java Agent吃内存,SideCar升级要把全网都升级一遍。如果是众多的IoT设备,那么Service Mesh的升级是所有硬件都升级一遍吗?据说,Facebook的主要手机商选择的是RSocket这种很小且高效的SDK。
微服务配置中心、服务注册与发现变革
配置中心属于微服务的基础设施。配置推送的功能主要是主动获取配置、配置更新推送。
在之前猪猪的几篇文章都提到了配置中心的Java实现,感兴趣的读者自行撸一遍下面的文章吧 :)
基于RSocket的改造,主要分为如下几步:
通过PropertySourceLocator的重写locate方法,来实现配置的推送。
通过RSocket和Reactor的配合,完成客户端和服务端的消息交互
创建config listener负责监听配置项的变化,对标Flux的subscribe,通过@RefreshScope 进行配置刷新
服务注册和发现也可以通过类似的方法处理,这样可以将原先的REST的方式变更为RSocket的方式,代码量更为精简,性能略好一些 :)
最简单的一个案例
写到这里,很多读者会说,怎么没案例代码,我们文末提供一下 :)
官网上其实提供过示例代码,Java服务端的:
RSocketFactory.receive()
.frameDecoder(Frame::retain)
.acceptor(new PingHandler())
.transport(TcpServerTransport.create(7878))
.start()
.block()
.onClose();
Java 客户端的
Mono<RSocket> client =
RSocketFactory.connect()
.frameDecoder(Frame::retain)
.transport(TcpClientTransport.create(7878))
.start();
PingClient pingClient = new PingClient(client);
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
int count = 1_000;
pingClient
.startPingPong(count, recorder)
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
.blockLast();
这里我给一个我自己编写的案例
第一步,引入Maven依赖
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-core</artifactId>
<version>0.11.15</version>
</dependency>
<dependency>
<groupId>io.rsocket</groupId>
<artifactId>rsocket-transport-netty</artifactId>
<version>0.11.15</version>
</dependency>
第二步、编写服务端代码
import io.rsocket.*;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.core.publisher.Mono;
public class RSocketServer {
public static void main(String[] args) {
SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(
new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(ByteBufPayload.create("HelloWorld,Charles! " + payload.getDataUtf8()));
}
});
RSocketFactory.receive()
.frameDecoder(Frame::retain)//Enable Zero Copy
.acceptor(acceptor)
.transport(TcpServerTransport.create(9527))
.start()
.block()
.onClose()
.block();
}
}
第三步、编写客户端代码
public class RSocketClient {
public static RSocket client;
static {
client = RSocketFactory.connect()
.frameDecoder(Frame::retain)//Enable Zero Copy
.transport(TcpClientTransport.create(9527))
.start()
.block();
}
public static void main(String[] args) {
RSocketClient.client.requestResponse(DefaultPayload.create("I am RSocket Demo"))
.subscribe(payload -> System.out.println(payload.getMetadataUtf8()));
while (true);
}
}
我们先运行服务端,可以看到如下内容输出:
17:40:48.612 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
17:40:48.616 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:40:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: MacOS
17:40:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
17:40:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
17:40:48.654 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
17:40:48.654 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
17:40:48.655 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
17:40:48.655 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/9p/mxclx8tj0257q8pl7_563bs80000gn/T (java.io.tmpdir)
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
17:40:48.658 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3817865216 bytes
17:40:48.658 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
17:40:48.659 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
17:40:48.659 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
17:40:48.678 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
17:40:48.678 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
17:40:48.679 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
17:40:48.680 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
17:40:48.694 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-tcp, daemon=true, selectCount=8, workerCount=8}
17:40:48.694 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default ConnectionProvider: PooledConnectionProvider {name=tcp, poolFactory=reactor.netty.resources.ConnectionProvider$$Lambda$14/1418621776@1a968a59}
17:40:48.698 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
17:40:48.699 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
17:40:48.704 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
17:40:48.724 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
17:40:48.724 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
17:40:48.731 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
17:40:48.731 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
17:40:48.741 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
17:40:48.976 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 53814 (auto-detected)
17:40:48.980 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 6c:96:cf:ff:fe:dd:15:d5 (auto-detected)
17:40:48.993 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
17:40:48.993 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
17:40:49.115 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0xc5709c4f, L:/0:0:0:0:0:0:0:0:9527] Bound new server
再运行客户端,可以看到消息发送成功,关键消息如下:
17:41:22.244 [reactor-tcp-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
17:41:22.246 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@8354e30
17:41:22.270 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.FluxReceive - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Added decoder [RSocketLengthCodec] at the end of the user pipeline, full pipeline: [RSocketLengthCodec, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Channel connected, now 1 active connections and 0 inactive connections
17:41:22.355 [main] DEBUG io.rsocket.FrameLogger - sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Payload: data: "I am RSocket Demo"
17:41:22.505 [reactor-tcp-nio-4] DEBUG io.rsocket.FrameLogger - receiving -> Frame => Stream ID: 1 Type: NEXT_COMPLETE Payload: data: "HelloWorld,Charles! I am RSocket Demo"
服务端显示收到消息:
17:41:22.246 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@8354e30
17:41:22.270 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.FluxReceive - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Added decoder [RSocketLengthCodec] at the end of the user pipeline, full pipeline: [RSocketLengthCodec, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Channel connected, now 1 active connections and 0 inactive connections
17:41:22.355 [main] DEBUG io.rsocket.FrameLogger - sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Payload: data: "I am RSocket Demo"
17:41:22.505 [reactor-tcp-nio-4] DEBUG io.rsocket.FrameLogger - receiving -> Frame => Stream ID: 1 Type: NEXT_COMPLETE Payload: data: "HelloWorld,Charles! I am RSocket Demo"
以上是关于RSocket思潮的主要内容,如果未能解决你的问题,请参考以下文章
Spring RSocket:基于服务注册发现的 RSocket 负载均衡
云原生实践之 RSocket 从入门到落地:Servlet vs RSocket
云原生实践之 RSocket 从入门到落地:Servlet vs RSocket