用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象

Posted 涤生的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象相关的知识,希望对你有一定的参考价值。

RSocket 系列的第三篇

如果你看过本系列的前两篇文章,应该已经已经发现 RSocket 提供了一些底层的 API。可以直接使用交互模型中的方法进行操作,而且可以没有任何限制来回发送帧。这些基础的 API 为我们提供了许多自由和控制权,但是它可能会引入额外的问题,尤其是与微服务之间的契约相关的问题。

为了解决这些问题,我们可以使用 RSocket 作为通用抽象层。有两种可用的解决方案:RSocket RPC 或者与 Spring Framework 集成。在以下各节中,我们将简要讨论它们。

RPC Over RSocket

基于 RSocket 的 RPC

保持微服务之间的契约干净清晰是分布式系统的关键问题之一。为了确保应用程序可以交换数据,我们可以利用 RPC(远程过程调用)。幸运的是,RSocket 具有专用的 RPC 模块,它使用 Protobuf 作为序列化工具,因此,我们可以从 RSocket 的性能中受益并且同时具有保持契约的能力。通过将生成的服务和对象与 RSocket 接受器结合在一起,我们可以启动完全可操作的 RPC 服务端,并使用 RPC 客户端轻松使用它。

首先,我们需要定义服务和对象。在下面的示例中,我们创建了具有四个方法的简单的 CustomerService服务,它们每个表示交互模型相互不同的方法。

 
   
   
 
  1. syntax = "proto3";

  2. option java_multiple_files = true;

  3. option java_outer_classname = "ServiceProto";

  4. package com.rsocket.rpc;

  5. import "google/protobuf/empty.proto";

  6. message SingleCustomerRequest {

  7. string id = 1;

  8. }

  9. message MultipleCustomersRequest {

  10. repeated string ids = 1;

  11. }

  12. message CustomerResponse {

  13. string id = 1;

  14. string name = 2;

  15. }

  16. service CustomerService {

  17. rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response

  18. rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream

  19. rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget

  20. rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel

  21. }

其次,我们必须使用上面显示的 proto 文件来生成类。为此,我们可以创建一个 Gradle 任务,如下所示:

 
   
   
 
  1. protobuf {

  2. protoc {

  3. artifact = 'com.google.protobuf:protoc:3.6.1'

  4. }

  5. generatedFilesBaseDir = "${projectDir}/build/generated-sources/"

  6. plugins {

  7. rsocketRpc {

  8. artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'

  9. }

  10. }

  11. generateProtoTasks {

  12. all()*.plugins {

  13. rsocketRpc {}

  14. }

  15. }

  16. }

作为 generateProto任务的结果,我们应该能够获得服务接口、服务客户端和服务服务端类

  • CustomerService

  • CustomerServiceClient

  • CustomerServiceServer

再次,我们必须实现服务接口(CustomerService)的相关业务逻辑:

 
   
   
 
  1. public class DefaultCustomerService implements CustomerService {

  2. private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");

  3. @Override

  4. public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {

  5. log.info("Received 'getCustomer' request [{}]", message);

  6. return Mono.just(CustomerResponse.newBuilder()

  7. .setId(message.getId())

  8. .setName(getRandomName())

  9. .build());

  10. }

  11. @Override

  12. public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {

  13. return Flux.interval(Duration.ofMillis(1000))

  14. .map(time -> CustomerResponse.newBuilder()

  15. .setId(UUID.randomUUID().toString())

  16. .setName(getRandomName())

  17. .build());

  18. }

  19. @Override

  20. public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {

  21. log.info("Received 'deleteCustomer' request [{}]", message);

  22. return Mono.just(Empty.newBuilder().build());

  23. }

  24. @Override

  25. public Flux customerChannel(Publisher messages, ByteBuf metadata) {

  26. return Flux.from(messages)

  27. .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))

  28. .map(message -> CustomerResponse.newBuilder()

  29. .setId(UUID.randomUUID().toString())

  30. .setName(getRandomName())

  31. .build());

  32. }

  33. private String getRandomName() {

  34. return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));

  35. }

  36. }

最后,我们可以通过 RSocket 暴露服务。为此,我们必须创建服务端的实例(CustomerServiceServer)并注入服务的实现(DefaultCustomerService)。然后,我们准备创建一个 RSocket 接受器实例。该 API 提供了 RequestHandlingRSocket,该服务包装服务端实例,并将契约中定义的端点转换为 RSocket 交互模型中可用的方法。

 
   
   
 
  1. public class Server {

  2. public static void main(String[] args) throws InterruptedException {

  3. CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());

  4. RSocketFactory

  5. .receive()

  6. .acceptor((setup, sendingSocket) -> Mono.just(

  7. new RequestHandlingRSocket(serviceServer)

  8. ))

  9. .transport(TcpServerTransport.create(7000))

  10. .start()

  11. .block();

  12. Thread.currentThread().join();

  13. }

  14. }

在客户端,实现非常简单。我们需要做的就是创建 RSocket 实例,并通过构造函数将其注入到客户端实例中,然后就可以开始了。


    1. @Slf4j

    2. public class Client {

    3. public static void main(String[] args) {

    4. RSocket rSocket = RSocketFactory

    5. .connect()

    6. .transport(TcpClientTransport.create(7000))

    7. .start()

    8. .block();

    9. CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);

    10. customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()

    11. .setId(UUID.randomUUID().toString()).build())

    12. .block();

    13. customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()

    14. .setId(UUID.randomUUID().toString()).build())

    15. .doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))

    16. .block();

    17. customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()

    18. .addIds(UUID.randomUUID().toString()).build())

    19. .doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))

    20. .subscribe();

    21. customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()

    22. .addIds(UUID.randomUUID().toString())

    23. .build()))

    24. .doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))

    25. .blockLast();

    26. }

    27. }


将 RSocket 与 RPC 方法结合使用有助于维护微服务之间的契约,并改善日常开发人员的体验。它适用于不需要完全控制帧的典型场景,但是另一方面,它不限制协议的灵活性。我们仍然可以在同一应用程序中暴露 RPC 端点以及普通的 RSocket 接受器,以便我们可以轻松地为给定用例选择最佳的通信模式。在 RSocket 上进行 RPC 的情况下,可能会出现一个基本的问题:它比 gRPC 好吗?这个问题没有简单的答案。RSocket 是一项新技术,它需要一些时间才能达到与 gRPC 相同的成熟度。另一方面,它在两个方面超过了 gRPC:性能(这里可以使用基准测试)和灵活性——可以作为传输层用于 RPC 或作为普通消息传递解决方案。在决定在生产环境中使用哪种软件之前,应该确定 RSocket 是否符合的“早期采用”策略,并且不会使软件面临风险。就个人而言,我建议在不太重要的区域引入 RSocket,然后再扩展到系统的其余部分。

Spring Boot Integration

第二个可用的解决方案是通过与 Spring Boot 的集成提供对 RSocket 的抽象,我们将 RSocket 用作反应式消息传递解决方案,并利用 Spring 注解轻松地将方法与路由连接起来。在下面的示例中,我们实现了两个 Spring Boot 应用程序(请求者和响应者)。响应者通过 CustomerController暴露 RSocket 接口,并映射到三个路径:customercustomer-streamcustomer-channel。这些映射中的每一个都反映了来自 RSocket 交互模型的不同方法(分别是请求-响应,请求流和通道)。CustomerController还实现了简单的业务逻辑,并返回带有随机名称的 CustomerResponse对象,如下例所示:

 
   
   
 
    1. @Slf4j

    2. @SpringBootApplication

    3. public class RSocketResponderApplication {

    4. public static void main(String[] args) {

    5. SpringApplication.run(RSocketResponderApplication.class);

    6. }

    7. @Controller

    8. public class CustomerController {

    9. private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");

    10. @MessageMapping("customer")

    11. CustomerResponse getCustomer(CustomerRequest customerRequest) {

    12. return new CustomerResponse(customerRequest.getId(), getRandomName());

    13. }

    14. @MessageMapping("customer-stream")

    15. Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {

    16. return Flux.range(0, multipleCustomersRequest.getIds().size())

    17. .delayElements(Duration.ofMillis(500))

    18. .map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));

    19. }

    20. @MessageMapping("customer-channel")

    21. Flux getCustomersChannel(Flux requests) {

    22. return Flux.from(requests)

    23. .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))

    24. .map(message -> new CustomerResponse(message.getId(), getRandomName()));

    25. }

    26. private String getRandomName() {

    27. return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));

    28. }

    29. }

    30. }


请注意,下面提供的示例基于 Spring Boot RSocket starter 2.2.0.M4 版本,这意味着它不是正式版本,API 可能会更改。

值得注意的是,Spring Boot 会自动检测类路径上的 RSocket 库并启动服务端。我们需要做的就是指定端口:

 
   
   
 
  1. spring:

  2. rsocket:

  3. server:

  4. port: 7000

这几行代码和配置设置了完全可操作的响应者程序。让我们再看一下请求方。在这里,我们实现了 CustomerServiceAdapter,它负责与响应者进行通信。它使用 RSocketRequester bean 封装 RSocket 实例,该 bean 中还包含数据类型以及封装在 RSocketStrategies 对象中编码/解码的详细信息。再用反应式的方式给 RSocketRequester 配置路由消息以及处理数据的序列化/反序列化信息。总结下来,我们需要做的就是提供路由、数据以及消费响应者的消息的方式——作为单个对象(Mono)或作为流(Flux)。

 
   
   
 
    1. @Slf4j

    2. @SpringBootApplication

    3. public class RSocketRequesterApplication {

    4. public static void main(String[] args) {

    5. SpringApplication.run(RSocketRequesterApplication.class);

    6. }

    7. @Bean

    8. RSocket rSocket() {

    9. return RSocketFactory

    10. .connect()

    11. .frameDecoder(PayloadDecoder.ZERO_COPY)

    12. .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)

    13. .transport(TcpClientTransport.create(7000))

    14. .start()

    15. .block();

    16. }

    17. @Bean

    18. RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {

    19. return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,

    20. rSocketStrategies);

    21. }

    22. @Component

    23. class CustomerServiceAdapter {

    24. private final RSocketRequester rSocketRequester;

    25. CustomerServiceAdapter(RSocketRequester rSocketRequester) {

    26. this.rSocketRequester = rSocketRequester;

    27. }

    28. Mono getCustomer(String id) {

    29. return rSocketRequester

    30. .route("customer")

    31. .data(new CustomerRequest(id))

    32. .retrieveMono(CustomerResponse.class)

    33. .doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));

    34. }

    35. Flux getCustomers(List ids) {

    36. return rSocketRequester

    37. .route("customer-stream")

    38. .data(new MultipleCustomersRequest(ids))

    39. .retrieveFlux(CustomerResponse.class)

    40. .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));

    41. }

    42. Flux getCustomerChannel(Flux customerRequestFlux) {

    43. return rSocketRequester

    44. .route("customer-channel")

    45. .data(customerRequestFlux, CustomerRequest.class)

    46. .retrieveFlux(CustomerResponse.class)

    47. .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));

    48. }

    49. }

    50. }


除了与响应者进行通信之外,请求者还通过三种路径暴露 RESTful API:/customers/{id}/customers/customers-channel。在这里,我们使用 Spring WebFlux,基于 HTTP2 协议。请注意,最后两个映射会生成文本事件流,这意味着数据可用时将被流式传输到 Web 浏览器。

 
   
   
 
  1. @RestController

  2. class CustomerController {

  3. private final CustomerServiceAdapter customerServiceAdapter;

  4. CustomerController(CustomerServiceAdapter customerServiceAdapter) {

  5. this.customerServiceAdapter = customerServiceAdapter;

  6. }

  7. @GetMapping("/customers/{id}")

  8. Mono getCustomer(@PathVariable String id) {

  9. return customerServiceAdapter.getCustomer(id);

  10. }

  11. @GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

  12. Publisher getCustomers() {

  13. return customerServiceAdapter.getCustomers(getRandomIds(10));

  14. }

  15. @GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

  16. Publisher getCustomersChannel() {

  17. return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))

  18. .map(id -> new CustomerRequest(UUID.randomUUID().toString())));

  19. }

  20. private List getRandomIds(int amount) {

  21. return IntStream.range(0, amount)

  22. .mapToObj(n -> UUID.randomUUID().toString())

  23. .collect(toList());

  24. }

  25. }

要使用上述 REST 接口,可以使用以下 curl命令:

 
   
   
 
  1. curl http://localhost:8080/customers/1

  2. curl http://localhost:8080/customers

  3. curl http://localhost:8080/customers-channel

Spring Boot 集成和 RPC 模块的是 RSocket 之上的补充解决方案。第一个是面向消息传递的,并提供了方便的消息路由 API,而 RPC 模块使开发人员可以轻松控制暴露的端口并维护微服务之间的契约。这两种解决方案都有应用场景,可以轻松地与 RSocket 底层 API 结合使用单一协议以一致的方式满足最复杂的要求。

系列总结

本文是与 RSocket 有关的微型系列的最后一部分,RSocket 是一种新的二进制协议,可以彻底改变云中服务之间的通信。其丰富的,性能和其他功能,例如,使其成为几乎所有可能的业务案例的理想选择。RSocket 的使用还可以通过可用的抽象层进行简化:Spring Boot 集成和 RPC 模块——它们可以解决最典型的日常场景。请注意,RSocket 处于候选版本(1.0.0-RC2)中,因此不建议在生产环境中使用该协议。尽管如此,还是应当保持关注的,因为不断增长的社区和大型科技公司(例如 Netflix,Facebook,阿里巴巴,Netifi)的支持可能会使 RSocket 成为云中的主要通信协议。


原文:https://dzone.com/articles/reactive-service-to-service-communication-with-rso-3

近期热文

 





喜欢本文的朋友们,欢迎长按下图关注订阅号涤生的博客,收看更多精彩内容


以上是关于用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象的主要内容,如果未能解决你的问题,请参考以下文章

浅谈 RSocket 与响应式编程

NetfiFacebook阿里等公司共同力推“响应式编程”技术:Rsocket

响应式应用新协议RSocket

Dubbo3.0新特性集成RSocket,新增响应式支持

Spring RSocket:基于服务注册发现的 RSocket 负载均衡

技术动向 | 反应式编程的通讯协议:RSocket 101