RSocket思潮

Posted 工匠人生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RSocket思潮相关的知识,希望对你有一定的参考价值。


当前浏览器不支持播放音乐或语音,请在微信或其他浏览器中播放 RSocket思潮

每当有新技术涌现时,总会有些弄潮儿粉墨登场,为了KPI抑或是晋升,唾沫横飞得布道这些新技术。


当然,软件咨询师、布道师,和程序员一样,也是一种职业,但是 PUA 布道师挺无聊的 :)


今天,让我们一起不那么功利地一起了解一下RSocket到底是什么?——只是为了知识,不为了所谓的钱和利益。

概念

RSocket思潮

如上图所示,RSocket的官方网站是:http://rsocket.io/ ,官方解释是RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。它支持会话恢复,允许跨不同的传输连接恢复长期存在的流。RSocket适用于网络连接频繁掉线、切换和重新连接的场景,因此对移动服务器通信特别有用。

需要强调的是,RSocket是一种应用层协议,不是一个传输层的协议。它可以支持传输层的协议和技术,比如TCP和Proto Buf之类,它的重点是将反应流的实现(如TCP滑动窗口等)提升到应用层来。

RSocket是面向连接的、消息驱动的协议,内置了应用程序级的流控制。它在浏览器中和在服务器上一样工作。Web 浏览器可以服务于后端微服务的流量。它也是二进制的,它可以同样好地处理文本和二进制数据,并且可以分解有效工作负载。它将应用程序中的所有交互建模为网络原语。这意味着,你可以流化数据或执行发布 / 订阅,而无需设置应用程序队列。

它是一种支持多语言的基于Reactive编程模型的API规范,基于自定义的二进制协议之上,使用方可以直接根据接口编写代码而无需关心协议的细节。在Java领域,RSocket主要是基于Java SDK的Reactor和RxJava,以及Spring的Netty-Reactor进行Reactive编程模型的设计。

RSocket自定义的二进制协议性能高,号称是HTTP的十倍多,当然实现高性能还必须根据业务场景去适配优化。

REST和RSocket之我见

Netflix 的联合创始人兼CEO,Robert Roeser 发表过观点,认为RSocket是REST的挑战者。

 https://www.infoq.com/articles/give-rest-a-rest-rsocket 

RSocket的确是REST的挑战者,但是我不认为它在国内能迅速普及甚至直接淘汰掉REST。

不可否认,REST是基于HTTP实现的,他是“人类可读”的一种方便调试的方式。REST在国内而言,基于Swagger,非常方便前后端联调。国内的业务项目(不是纯技术项目)排期其实很大程度是依赖于前端团队的,曾几何时,常常听到前端团队的主管(纯管理)在会议上因为资源排期的问题,大声骂后端 “别TMD说前端做几个页面能有多少人日,有本事你做个页面吗?”。

拓展阅读:


简单二进制编码SBE https://github.com/real-logic/simple-binary-encoding 

FlatBuffers https://google.github.io/flatbuffers


 比使用Json快得多得多。

就算你的二进制程序将二进制转换为文本,通过网络将它转移给另一台机器,解析大量JSON和字符串导致了缓存未命中;甚至分发二进制数据对数据进行Base64编码,导致同样的数据序列化两次。你还会觉得REST过于可笑吗?

就算平均每秒 10 个请求的服务,它的 JSON 大小为 1KB,即相当于每天 860MB 的数据,或者每天 250 本《战争与和平》,没有人能读懂,就算浪费钱浪费资源,在国内,你能让前端团队放弃使用JSON吗?

至于输赢如何,就像某个公司老板认为 k8s 太冒险,只允许前进到Docker,个人认为RSocket在国内形势不太看好。

RSocket适用场景

RSocket还是可以提供二进制序列化、双向通信、多路复用、分割请求、元数据交换、流控制等能力。所以,在这些场景下,RSocket可以有很好的应用。

Dubbo支持响应式编程

RSocket对于Dubbo这类的RPC框架以及全链路追踪会产生互相的影响及变革。

https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket

参照如上代码分支,用户可以在请求参数和返回值里使用 Mono 和 Flux 类型的对象,配置方式除了协议名使用RSocket之外,没有太多区别的地方。

这只是Dubbo的RSocket的第一步,分布式调用链/故障注入的未来,也将不限于RPC。

之前链路追踪的文章可以参考一下哈


Envoy、Istio、SideCar的补充?

微服务的发展经历了SpringCloud、ServiceMesh,未来该何去何从?未来是中央Mixer还是轻量级的Agent?

RSocket对于Envoy、Istio、SideCar等等也会发生一些化学作用。

Envoy是一个基于HTTP2/1.1的Proxy,不支持多路复用、非阻塞和完美的长连接。Envoy依靠control plane来实现weave mesh,比如新版本Istio Proxy的连接池就占用了很多的性能。Spring Framework 5.2 以后据说即将把RSocket作为缺省的反应通讯协议,在多路复用、非阻塞、长连接等方面会让用户更多的接触到。

Istio虽然号称不依赖Kubernets,但是在Kubernets外部署和管理sidecar proxy还是需要的。RSocket Broker相对比较容易。

众所周知,Java Agent吃内存,SideCar升级要把全网都升级一遍。如果是众多的IoT设备,那么Service Mesh的升级是所有硬件都升级一遍吗?据说,Facebook的主要手机商选择的是RSocket这种很小且高效的SDK。

微服务配置中心、服务注册与发现变革

RSocket思潮

配置中心属于微服务的基础设施。配置推送的功能主要是主动获取配置、配置更新推送。

在之前猪猪的几篇文章都提到了配置中心的Java实现,感兴趣的读者自行撸一遍下面的文章吧  :)






基于RSocket的改造,主要分为如下几步:

  1. 通过PropertySourceLocator的重写locate方法,来实现配置的推送。

  2. 通过RSocket和Reactor的配合,完成客户端和服务端的消息交互

  3. 创建config listener负责监听配置项的变化,对标Flux的subscribe,通过@RefreshScope 进行配置刷新

服务注册和发现也可以通过类似的方法处理,这样可以将原先的REST的方式变更为RSocket的方式,代码量更为精简,性能略好一些 :)

最简单的一个案例

写到这里,很多读者会说,怎么没案例代码,我们文末提供一下 :)

官网上其实提供过示例代码,Java服务端的:

RSocketFactory.receive()
    .frameDecoder(Frame::retain)
    .acceptor(new PingHandler())
    .transport(TcpServerTransport.create(7878))
    .start()
    .block()
    .onClose();

Java 客户端的

Mono<RSocket> client =
    RSocketFactory.connect()
        .frameDecoder(Frame::retain)
        .transport(TcpClientTransport.create(7878))
        .start();

PingClient pingClient = new PingClient(client);

Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));

int count = 1_000;

pingClient
    .startPingPong(count, recorder)
    .doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
    .blockLast();

这里我给一个我自己编写的案例

第一步,引入Maven依赖

        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-core</artifactId>
            <version>0.11.15</version>
        </dependency>

        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-transport-netty</artifactId>
            <version>0.11.15</version>
        </dependency>

第二步、编写服务端代码

import io.rsocket.*;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.core.publisher.Mono;

public class RSocketServer {

    public static void main(String[] args) {
        SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(
                new AbstractRSocket() {
                    @Override
                    public Mono<Payload> requestResponse(Payload payload) {
                        return Mono.just(ByteBufPayload.create("HelloWorld,Charles!    " + payload.getDataUtf8()));
                    }
                });

        RSocketFactory.receive()
                .frameDecoder(Frame::retain)//Enable Zero Copy
                .acceptor(acceptor)
                .transport(TcpServerTransport.create(9527))
                .start()
                .block()
                .onClose()
                .block();
    }
}

第三步、编写客户端代码

public class RSocketClient {

    public static RSocket client;

    static {
        client = RSocketFactory.connect()
                .frameDecoder(Frame::retain)//Enable Zero Copy
                .transport(TcpClientTransport.create(9527))
                .start()
                .block();
    }

    public static void main(String[] args) {
        RSocketClient.client.requestResponse(DefaultPayload.create("I am RSocket Demo"))
                .subscribe(payload -> System.out.println(payload.getMetadataUtf8()));

        while (true);
    }


}

我们先运行服务端,可以看到如下内容输出:

17:40:48.612 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
17:40:48.616 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:40:48.649 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: MacOS
17:40:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
17:40:48.652 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
17:40:48.654 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
17:40:48.654 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
17:40:48.655 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
17:40:48.655 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/9p/mxclx8tj0257q8pl7_563bs80000gn/T (java.io.tmpdir)
17:40:48.656 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
17:40:48.658 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3817865216 bytes
17:40:48.658 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
17:40:48.659 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
17:40:48.659 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
17:40:48.678 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
17:40:48.678 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
17:40:48.679 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
17:40:48.680 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
17:40:48.694 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-tcp, daemon=true, selectCount=8, workerCount=8}
17:40:48.694 [main] DEBUG reactor.netty.tcp.TcpResources - [tcp] resources will use the default ConnectionProvider: PooledConnectionProvider {name=tcp, poolFactory=reactor.netty.resources.ConnectionProvider$$Lambda$14/1418621776@1a968a59}
17:40:48.698 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
17:40:48.699 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
17:40:48.704 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
17:40:48.724 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
17:40:48.724 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
17:40:48.731 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
17:40:48.731 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
17:40:48.741 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
17:40:48.976 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 53814 (auto-detected)
17:40:48.980 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 6c:96:cf:ff:fe:dd:15:d5 (auto-detected)
17:40:48.993 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
17:40:48.993 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
17:40:49.023 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
17:40:49.024 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
17:40:49.038 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
17:40:49.115 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0xc5709c4f, L:/0:0:0:0:0:0:0:0:9527] Bound new server

再运行客户端,可以看到消息发送成功,关键消息如下:

17:41:22.244 [reactor-tcp-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBoundstrue
17:41:22.246 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetectorio.netty.util.ResourceLeakDetector@8354e30
17:41:22.270 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.FluxReceive - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Added decoder [RSocketLengthCodec] at the end of the user pipeline, full pipeline: [RSocketLengthCodec, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Channel connected, now 1 active connections and 0 inactive connections
17:41:22.355 [main] DEBUG io.rsocket.FrameLogger - sending -> Frame => Stream ID: 1 Type: REQUEST_RESPONSE Payload: data: "I am RSocket Demo" 
17:41:22.505 [reactor-tcp-nio-4] DEBUG io.rsocket.FrameLogger - receiving -> Frame => Stream ID: 1 Type: NEXT_COMPLETE Payload: data: "HelloWorld,Charles!    I am RSocket Demo" 

服务端显示收到消息:

17:41:22.246 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@8354e30
17:41:22.270 [reactor-tcp-nio-4] DEBUG reactor.netty.channel.FluxReceive - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.ReactorNetty - [id: 0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Added decoder [RSocketLengthCodec] at the end of the user pipeline, full pipeline: [RSocketLengthCodec, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
17:41:22.343 [reactor-tcp-nio-4] DEBUG reactor.netty.resources.PooledConnectionProvider - [id0x6924804f, L:/0:0:0:0:0:0:0:1:54848 - R:/0:0:0:0:0:0:0:1:9527] Channel connected, now 1 active connections and 0 inactive connections
17:41:22.355 [main] DEBUG io.rsocket.FrameLogger - sending -> Frame => Stream ID1 Type: REQUEST_RESPONSE Payload: data"I am RSocket Demo" 
17:41:22.505 [reactor-tcp-nio-4] DEBUG io.rsocket.FrameLogger - receiving -> Frame => Stream ID1 Type: NEXT_COMPLETE Payload: data"HelloWorld,Charles!    I am RSocket Demo" 



       

       

     

       

            

       

     

       

       

      

     

     

       

      

        

以上是关于RSocket思潮的主要内容,如果未能解决你的问题,请参考以下文章

Spring RSocket:基于服务注册发现的 RSocket 负载均衡

云原生实践之 RSocket 从入门到落地:Servlet vs RSocket

云原生实践之 RSocket 从入门到落地:Servlet vs RSocket

如何在内卷的思潮中保持自己的定力

Spring官方RSocket Broker 0.3.0发布: 快速构建你的RSocket架构

初探Rsocket