初探Rsocket
Posted 像写诗一样写代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初探Rsocket相关的知识,希望对你有一定的参考价值。
初探Rsocket
标签(空格分隔): 未分类
RSocket是什么
RSocket是一个完全异步的、双向、多路复用、基于消息、基于反应流回压的二进制协议,是位于OSI第5/6层或者是TCP/IP的应用层协议。被称为比HTTP更适合现代分布式系统微服务应用的通讯协议。RSocket充分拥抱反应式设计思想,更加并拥有应用消息级别的流控、灵活的帧传输协议选择、高效的性能、多样的交互方式,被寄予厚望在下一代RPC框架实践中。 按照设计者的原话来讲:
我们有 iphone 和 android 手机,我们等听通知,所以我们不需要通过请求来得到回复,我们可以得到多个回复,而不需要与设备进行交互。我们在运动时还会使用智能手表,它与后台服务器交互,提供统计信息。我们有智能助手与后端服务器交互。所有这些交互模型都是我们所说的连接体验的一部分。HTTP 真不是为此而设计的。
Maldini 认为,HTTP 的一个重要问题是,它把所有的职责都放在客户端,用重试逻辑、超时、断路器等来处理不同类型的错误。使用反应式架构构建的应用程序可以提高效率并有很好的扩展性,但 Maldini 认为,“反应式支持停留在应用程序边界上”。
和HTTP简单的客户端请求/服务端响应的模式不同的是,RSocket提供了四种交互模式,这些不同的交互模式可以为不同场景提供更高效、方便的理想通信模型,例如代替轮询方式的推送。
Fire-and-Forget:优化的请求/响应,发完即忘的形式,适用于请求之后无需响应结果。
Request-Response:即一个请求只会接受到一个响应。
Request-Stream:一个请求会接受到多个响应。
Channel:channel是双向通信的模式,客户端和服务端可以是请求者也可以是响应者,该模式下消息传递都是异步的。
设计动机
按照官网的文档来说,RSocket的设计动机首先是为了拥抱反应式应用程序,非阻塞异步并且支持背压的特性带来性能上的提升,能灵活选择基于TCP、WebSockets、Aeron、QUIC等协议作为传输RSocket帧的传输协议,其余之外还有基于request-n、租约语义的流控、更简单高效的恢复重连和取消行为以及多种匹配实际业务需求的微服务架构下可用的交互模式
和HTTP2的区别
单从某些方面来看,HTTP2好像和RSocket很相像,包括多路复用、双向流的特性。
HTTP2虽然对HTTP1.x上做了很大性能上的优化但是本质上还是没有海边HTTP1.x的语义,更适合在浏览器之间做基于请求/响应的文档传输。
HTTP2并没有基于响应者对请求者的流量控制,只有请求者对响应者的流控,并且这种流控是在字节长度层面通过
WINDOW_UPDATE
帧来进行流控,而不是应用消息层面的流控。RSocket的request(n)
和租约语义则是在消息数量层面上对另一端的流控限制。HTTP2的流模型较为低效,并且没有提供上述期望的交互模型,例如fire-and-forget。
而RSocket更适合在微服务架构下的应用之间通信。(按我的理解是,RSocket更适合于分布式环境中的各主机之间信息的交互,而HTTP则更适合设备和服务器之间的通信)
Rsocket编程初识
public static void main(String[] args) {
// SERVER
RSocketFactory.receive()
.acceptor(
//
(setup, reactiveSocket) -> {
reactiveSocket
.requestStream(DefaultPayload.create("Hello-Bidi"))
.map(Payload::getDataUtf8)
.doOnNext(System.out::println)
.subscribe();
// the acceptor to handle the request
return Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.just(DefaultPayload.create("response to client"));
}
});
})
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.subscribe();
// CLIENT
RSocket socket =
RSocketFactory.connect()
.acceptor(
// the acceptor to handle the request
rSocket ->
new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
System.out.println(payload.getDataUtf8());
return Flux.interval(Duration.ofSeconds(1))
.map(aLong -> DefaultPayload.create("Bi-di Response => " + aLong));
}
})
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
// Client and request and handle response
socket.requestStream(DefaultPayload.create("request to server"))
.doOnNext(resp -> System.out.println("resp:" + resp.getDataUtf8()))
.subscribe();
socket.onClose().block();
}
如上简单程序客户端和服务端都在一个JVM进程中,服务端的 acceptor()
方法要求注册一个 SocketAcceptor
接口。
public interface SocketAcceptor {
/**
* Accepts a new {@code RSocket} used to send requests to the peer and returns another {@code
* RSocket} that is used for accepting requests from the peer.
*
* @param setup Setup as sent by the client.
* @param sendingSocket Socket used to send requests to the peer.
* @return Socket to accept requests from the peer.
* @throws SetupException If the acceptor needs to reject the setup of this socket.
*/
Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}
需要实现一个accept方法,该方法会得到一个连接套接字和连接配置,我们可以保存下该连接套接字并且后续通过该方法进行发送请求给客户端。(RSocket协议的语义是客户端和服务端对等,而黑HTTP那样明确的客户端请求/服务端响应模式) 该方法需要返回一个 Mono<RSocket>
类型,以上面的例子来说,通过实现一个 AbstractRSocket
l,我们根据需要重写其中的几个方法。 这些方法会在对端进行对应类型交互模式的请求时调用处理,并返回Mono或者Flux类型作为响应数据,主要有如下几个方法:
// 发完即忘模式
Mono<Void> fireAndForget(Payload payload);
// 请求-响应模式
Mono<Payload> requestResponse(Payload payload);
// 请求-响应流模式
Flux<Payload> requestStream(Payload payload);
// 双向channel模式
Flux<Payload> requestChannel(Publisher<Payload> payloads);
// 元数据推送
Mono<Void> metadataPush(Payload payload);
整个过程就是客户端套接字通过 socket.requestStream
发送请求数据,该方法返回Mono或者Flux类型,之后由服务端在 acceptor
中构造返回的
return Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.just(DefaultPayload.create("response to client"));
}
});
进行处理,该方法返回的 Flux
或者 Mono
数据作为响应结果发送给请求对端,请求对端通过对请求方法调用的Mono或者Flux类型的处理即为对响应结果的处理。
socket.requestStream(DefaultPayload.create("request to server"))
// 对响应结果进行处理
.doOnNext(resp -> System.out.println("resp:" + resp.getDataUtf8()))
.subscribe();
可以看到RSocket极大地简化了网络模型,将复杂的网络调用转化成本地的反应式行为,通过对反应式编程中Flux和Mono对象的订阅处理和返回来作为响应的处理和发送。
Rsocket在Dubbo中的使用
想要学习RSocket的实际使用就是去学习别人的源码,但目前关于开源的使用RSocket的大型项目并不多,可以去Github上搜索看看。
Dubbo3.0中也集成了RSocket,官方例子
其使用除了引入对应的依赖包之外,需要服务接口方法的返回值类型为 Mono
或者是 Flux
类型,以官方例子来说:
public interface DemoService {
Mono<String> requestMono(String name);
Mono<String> requestMonoBizError(String name);
Flux<String> requestFlux(String name);
Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);
Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2);
}
实现原理是: Dubbo依旧将RSocket请求发送以及响应等细节职责交给RSocket的API来做,只由Dubbo的Invoker和Protocol层包装了RSocket的这些细节。 主要看RSocketInvoker和RSocketProtocol,RSocketInvoker类处理请求发送的RSocket流程,并对返回的结果进行额外的处理,RSocketProtocol则隐藏RSocket创建RSocket Server和Client的细节,以及对于请求响应数据的转化处理,因为RSocket需要的是Mono
Dubbo所做的事就是将原先的客户端和服务端由RSocket方式创建,并且将Dubbo的请求和响应格式转化为RSocket所需的传输格式,最后还是由RSocket进行发送到对端。
参考
[【Dubbo3.0新特性】集成RSocket,新增响应式支持](https://www.cnkirito.moe/dubbo-rsocket/)
[Introduction to RSocket](https://www.baeldung.com/rsocket)
[RSocket:一个面向反应式应用程序的新型应用网络协议](https://www.infoq.cn/article/2018/10/rsocket-facebook)
[Netflix-Rsocket-examples](https://github.com/netifi?utf8=%E2%9C%93&q=rsocket&type=&language=)
以上是关于初探Rsocket的主要内容,如果未能解决你的问题,请参考以下文章
阿里巴巴 Rsocket - MonoContextWrite 无法转换为 java.lang.Integer 类
Spring RSocket:基于服务注册发现的 RSocket 负载均衡
云原生实践之 RSocket 从入门到落地:Servlet vs RSocket
云原生实践之 RSocket 从入门到落地:Servlet vs RSocket