Dubbo3.0新特性集成RSocket,新增响应式支持

Posted Kirito的技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo3.0新特性集成RSocket,新增响应式支持相关的知识,希望对你有一定的参考价值。


响应式编程

响应式编程现在是现在一个很热的话题。响应式编程让开发者更方便地编写高性能的异步代码,关于响应式编程更详细的信息可以参考 http://reactivex.io/ 。很可惜,在之前很长一段时间里,Dubbo 并不支持响应式编程,简单来说,Dubbo 不支持在 RPC 调用时,使用 Mono/Flux 这种流对象(reactive-stream 中流的概念),给用户使用带来了不便。

RSocket 是一个支持 reactive-stream 语义的开源网络通信协议,它将 reactive 语义的复杂逻辑封装了起来,使得上层可以方便实现网络程序。RSocket 详细资料:http://rsocket.io/。

Dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程提供了支持,用户可以在请求参数和返回值里使用 Mono 和 Flux 类型的对象。下面我们给出使用范例,源码可以在文末获取。

Dubbo RSocket 初体验

服务接口

 
   
   
 
  1. public interface DemoService {

  2. Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);

  3. Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2);

  4. }

 
   
   
 
  1. <dependency>

  2. <groupId>io.projectreactor</groupId>

  3. <artifactId>reactor-core</artifactId>

  4. <version>3.2.3-RELEASE</version>

  5. </dependency>

在服务定义层,引入了 Mono,Flux 等 reactor 的概念,所以需要添加 reactor-core 的依赖。

服务提供者

 
   
   
 
  1. public class DemoServiceImpl implements DemoService {

  2. @Override

  3. public Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {

  4. return m1.zipWith(m2, new BiFunction<String, String, String>() {

  5. @Override

  6. public String apply(String s, String s2) {

  7. return s+" "+s2;

  8. }

  9. });

  10. }

  11. @Override

  12. public Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {

  13. return f1.zipWith(f2, new BiFunction<String, String, String>() {

  14. @Override

  15. public String apply(String s, String s2) {

  16. return s+" "+s2;

  17. }

  18. });

  19. }

  20. }

除了常规的 Dubbo 必须依赖之外,还需要添加 dubbo-rsocket 的扩展

 
   
   
 
  1. //... other dubbo moudle

  2. <dependency>

  3. <groupId>org.apache.dubbo</groupId>

  4. <artifactId>dubbo-rpc-rsocket</artifactId>

  5. </dependency>

配置并启动服务端,注意协议名字填写 rsocket:

 
   
   
 
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  2. xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

  3. xmlns="http://www.springframework.org/schema/beans"

  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  5.        http://dubbo.apache.org/schema/Dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

  6. <!-- provider's application name, used for tracing dependency relationship -->

  7. <dubbo:application name="demo-provider"/>

  8. <!-- use registry center to export service -->

  9. <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

  10. <!-- use Dubbo protocol to export service on port 20890 -->

  11. <dubbo:protocol name="rsocket" port="20890"/>

  12. <!-- service implementation, as same as regular local bean -->

  13. <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/>

  14. <!-- declare the service interface to be exported -->

  15. <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/>

  16. </beans>

服务提供者的 bootstrap:

 
   
   
 
  1. public class RsocketProvider {

  2. public static void main(String[] args) throws Exception {

  3. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});

  4. context.start();

  5. System.in.read(); // press any key to exit

  6. }

  7. }

服务消费者

然后配置并启动消费者消费者如下, 注意协议名填写 rsocket:

 
   
   
 
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  2. xmlns:dubbo="http://dubbo.apache.org/schema/Dubbo"

  3. xmlns="http://www.springframework.org/schema/beans"

  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  5.        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

  6. <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),

  7. don't set it same as provider -->

  8. <dubbo:application name="demo-consumer"/>

  9. <!-- use registry center to discover service -->

  10. <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

  11. <!-- generate proxy for the remote service, then demoService can be used in the same way as the

  12. local regular interface -->

  13. <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/>

  14. </beans>

 
   
   
 
  1. public class RsocketConsumer {

  2. public static void main(String[] args) {

  3. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});

  4. context.start();

  5. DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

  6. while (true) {

  7. try {

  8. Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));

  9. monoResult.doOnNext(new Consumer<String>() {

  10. @Override

  11. public void accept(String s) {

  12. System.out.println(s);

  13. }

  14. }).block();

  15. Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));

  16. fluxResult.doOnNext(new Consumer<String>() {

  17. @Override

  18. public void accept(String s) {

  19. System.out.println(s);

  20. }

  21. }).blockLast();

  22. } catch (Throwable throwable) {

  23. throwable.printStackTrace();

  24. }

  25. }

  26. }

  27. }

可以看到配置上除了协议名使用 rsocket 以外其他并没有特殊之处。

实现原理

以前用户并不能在参数或者返回值里使用 Mono/Flux 这种流对象(reactive-stream里的流的概念)。因为流对象自带异步属性,当业务把流对象作为参数或者返回值传递给框架之后,框架并不能将流对象正确的进行序列化。

Dubbo 基于 RSocket 提供了 reactive 支持。RSocket 将 reactive 语义的复杂逻辑封装起来了,给上层提供了简洁的抽象如下:

 
   
   
 
  1. Mono<Void> fireAndForget(Payload payload);

  2. Mono<Payload> requestResponse(Payload payload);

  3. Flux<Payload> requestStream(Payload payload);

  4. Flux<Payload> requestChannel(Publisher<Payload> payloads);

  • 从客户端视角看,框架建立连接之后,只需要将请求信息编码到 Payload 里,然后通过 requestStream 方法即可向服务端发起请求。

  • 从服务端视角看,RSocket 收到请求之后,会调用我们实现的 requestStream 方法,我们从 Payload 里解码得到请求信息之后,调用业务方法,然后拿到 Flux 类型的返回值即可。

需要注意的是业务返回值一般是 Flux<BizDO>,而 RSocket 要求的是 Flux<Payload>,所以我们需要通过 map operator 拦截业务数据,将 BizDO 编码为 Payload 才可以递交给 RSocket。而 RSocket 会负责数据的传输和 reactive 语义的实现。

结语

Dubbo 2.7 相比 Dubbo 2.6 提供了 CompletableFuture 的异步化支持,在 Dubbo 3.0 又继续拥抱了 Reactive,不断对新特性的探索,无疑是增加了使用者的信心。RSocket 这一框架/协议,如今在国内外也是比较火的一个概念,它提供了丰富的 Reactive 语义以及多语言的支持,使得服务治理框架可以很快地借助它实现 Reactive 语义。有了响应式编程支持,业务可以更加方便的实现异步逻辑。

本篇文章对 Dubbo RSocket 进行了一个简单的介绍,对 Reactive、RSocket 感兴趣的同学也可以浏览下 Dubbo 3.0 源码对 RSocket 的封装。

相关链接:

[1] 文中源码:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket

[2] Dubbo 3.x 开发分支:https://github.com/apache/incubator-Dubbo/tree/3.x-dev


以上是关于Dubbo3.0新特性集成RSocket,新增响应式支持的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo 3.0 ? No ! RSocket 永远的神

浅谈 RSocket 与响应式编程

浅谈RSocket与响应式编程

dubbo3.0新特性总结

dubbo3.0新特性总结

响应式应用新协议RSocket