用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象
Posted 涤生的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象相关的知识,希望对你有一定的参考价值。
RSocket 系列的第三篇
如果你看过本系列的前两篇文章,应该已经已经发现 RSocket 提供了一些底层的 API。可以直接使用交互模型中的方法进行操作,而且可以没有任何限制来回发送帧。这些基础的 API 为我们提供了许多自由和控制权,但是它可能会引入额外的问题,尤其是与微服务之间的契约相关的问题。
为了解决这些问题,我们可以使用 RSocket 作为通用抽象层。有两种可用的解决方案:RSocket RPC 或者与 Spring Framework 集成。在以下各节中,我们将简要讨论它们。
RPC Over RSocket
基于 RSocket 的 RPC
保持微服务之间的契约干净清晰是分布式系统的关键问题之一。为了确保应用程序可以交换数据,我们可以利用 RPC(远程过程调用)。幸运的是,RSocket 具有专用的 RPC 模块,它使用 Protobuf 作为序列化工具,因此,我们可以从 RSocket 的性能中受益并且同时具有保持契约的能力。通过将生成的服务和对象与 RSocket 接受器结合在一起,我们可以启动完全可操作的 RPC 服务端,并使用 RPC 客户端轻松使用它。
首先,我们需要定义服务和对象。在下面的示例中,我们创建了具有四个方法的简单的 CustomerService
服务,它们每个表示交互模型相互不同的方法。
syntax = "proto3";
option java_multiple_files = true;
option java_outer_classname = "ServiceProto";
package com.rsocket.rpc;
import "google/protobuf/empty.proto";
message SingleCustomerRequest {
string id = 1;
}
message MultipleCustomersRequest {
repeated string ids = 1;
}
message CustomerResponse {
string id = 1;
string name = 2;
}
service CustomerService {
rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response
rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream
rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget
rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel
}
其次,我们必须使用上面显示的 proto 文件来生成类。为此,我们可以创建一个 Gradle 任务,如下所示:
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.6.1'
}
generatedFilesBaseDir = "${projectDir}/build/generated-sources/"
plugins {
rsocketRpc {
artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'
}
}
generateProtoTasks {
all()*.plugins {
rsocketRpc {}
}
}
}
作为 generateProto
任务的结果,我们应该能够获得服务接口、服务客户端和服务服务端类
CustomerService
CustomerServiceClient
CustomerServiceServer
再次,我们必须实现服务接口(CustomerService)的相关业务逻辑:
public class DefaultCustomerService implements CustomerService {
private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
@Override
public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {
log.info("Received 'getCustomer' request [{}]", message);
return Mono.just(CustomerResponse.newBuilder()
.setId(message.getId())
.setName(getRandomName())
.build());
}
@Override
public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {
return Flux.interval(Duration.ofMillis(1000))
.map(time -> CustomerResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setName(getRandomName())
.build());
}
@Override
public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {
log.info("Received 'deleteCustomer' request [{}]", message);
return Mono.just(Empty.newBuilder().build());
}
@Override
public Flux customerChannel(Publisher messages, ByteBuf metadata) {
return Flux.from(messages)
.doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
.map(message -> CustomerResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setName(getRandomName())
.build());
}
private String getRandomName() {
return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
}
}
最后,我们可以通过 RSocket 暴露服务。为此,我们必须创建服务端的实例(CustomerServiceServer)并注入服务的实现(DefaultCustomerService)。然后,我们准备创建一个 RSocket 接受器实例。该 API 提供了 RequestHandlingRSocket
,该服务包装服务端实例,并将契约中定义的端点转换为 RSocket 交互模型中可用的方法。
public class Server {
public static void main(String[] args) throws InterruptedException {
CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());
RSocketFactory
.receive()
.acceptor((setup, sendingSocket) -> Mono.just(
new RequestHandlingRSocket(serviceServer)
))
.transport(TcpServerTransport.create(7000))
.start()
.block();
Thread.currentThread().join();
}
}
在客户端,实现非常简单。我们需要做的就是创建 RSocket 实例,并通过构造函数将其注入到客户端实例中,然后就可以开始了。
@Slf4j
public class Client {
public static void main(String[] args) {
RSocket rSocket = RSocketFactory
.connect()
.transport(TcpClientTransport.create(7000))
.start()
.block();
CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);
customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()
.setId(UUID.randomUUID().toString()).build())
.block();
customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()
.setId(UUID.randomUUID().toString()).build())
.doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))
.block();
customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()
.addIds(UUID.randomUUID().toString()).build())
.doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))
.subscribe();
customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()
.addIds(UUID.randomUUID().toString())
.build()))
.doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))
.blockLast();
}
}
将 RSocket 与 RPC 方法结合使用有助于维护微服务之间的契约,并改善日常开发人员的体验。它适用于不需要完全控制帧的典型场景,但是另一方面,它不限制协议的灵活性。我们仍然可以在同一应用程序中暴露 RPC 端点以及普通的 RSocket 接受器,以便我们可以轻松地为给定用例选择最佳的通信模式。在 RSocket 上进行 RPC 的情况下,可能会出现一个基本的问题:它比 gRPC 好吗?这个问题没有简单的答案。RSocket 是一项新技术,它需要一些时间才能达到与 gRPC 相同的成熟度。另一方面,它在两个方面超过了 gRPC:性能(这里可以使用基准测试)和灵活性——可以作为传输层用于 RPC 或作为普通消息传递解决方案。在决定在生产环境中使用哪种软件之前,应该确定 RSocket 是否符合的“早期采用”策略,并且不会使软件面临风险。就个人而言,我建议在不太重要的区域引入 RSocket,然后再扩展到系统的其余部分。
Spring Boot Integration
第二个可用的解决方案是通过与 Spring Boot 的集成提供对 RSocket 的抽象,我们将 RSocket 用作反应式消息传递解决方案,并利用 Spring 注解轻松地将方法与路由连接起来。在下面的示例中,我们实现了两个 Spring Boot 应用程序(请求者和响应者)。响应者通过 CustomerController
暴露 RSocket 接口,并映射到三个路径:customer
、 customer-stream
和 customer-channel
。这些映射中的每一个都反映了来自 RSocket 交互模型的不同方法(分别是请求-响应,请求流和通道)。CustomerController
还实现了简单的业务逻辑,并返回带有随机名称的 CustomerResponse
对象,如下例所示:
@Slf4j
@SpringBootApplication
public class RSocketResponderApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketResponderApplication.class);
}
@Controller
public class CustomerController {
private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
@MessageMapping("customer")
CustomerResponse getCustomer(CustomerRequest customerRequest) {
return new CustomerResponse(customerRequest.getId(), getRandomName());
}
@MessageMapping("customer-stream")
Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {
return Flux.range(0, multipleCustomersRequest.getIds().size())
.delayElements(Duration.ofMillis(500))
.map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));
}
@MessageMapping("customer-channel")
Flux getCustomersChannel(Flux requests) {
return Flux.from(requests)
.doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
.map(message -> new CustomerResponse(message.getId(), getRandomName()));
}
private String getRandomName() {
return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
}
}
}
请注意,下面提供的示例基于 Spring Boot RSocket starter 2.2.0.M4 版本,这意味着它不是正式版本,API 可能会更改。
值得注意的是,Spring Boot 会自动检测类路径上的 RSocket 库并启动服务端。我们需要做的就是指定端口:
spring:
rsocket:
server:
port: 7000
这几行代码和配置设置了完全可操作的响应者程序。让我们再看一下请求方。在这里,我们实现了 CustomerServiceAdapter
,它负责与响应者进行通信。它使用 RSocketRequester
bean 封装 RSocket 实例,该 bean 中还包含数据类型以及封装在 RSocketStrategies 对象中编码/解码的详细信息。再用反应式的方式给 RSocketRequester 配置路由消息以及处理数据的序列化/反序列化信息。总结下来,我们需要做的就是提供路由、数据以及消费响应者的消息的方式——作为单个对象(Mono)或作为流(Flux)。
@Slf4j
@SpringBootApplication
public class RSocketRequesterApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketRequesterApplication.class);
}
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
rSocketStrategies);
}
@Component
class CustomerServiceAdapter {
private final RSocketRequester rSocketRequester;
CustomerServiceAdapter(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
Mono getCustomer(String id) {
return rSocketRequester
.route("customer")
.data(new CustomerRequest(id))
.retrieveMono(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));
}
Flux getCustomers(List ids) {
return rSocketRequester
.route("customer-stream")
.data(new MultipleCustomersRequest(ids))
.retrieveFlux(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
}
Flux getCustomerChannel(Flux customerRequestFlux) {
return rSocketRequester
.route("customer-channel")
.data(customerRequestFlux, CustomerRequest.class)
.retrieveFlux(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
}
}
}
除了与响应者进行通信之外,请求者还通过三种路径暴露 RESTful API:/customers/{id}
, /customers
, /customers-channel
。在这里,我们使用 Spring WebFlux,基于 HTTP2 协议。请注意,最后两个映射会生成文本事件流,这意味着数据可用时将被流式传输到 Web 浏览器。
@RestController
class CustomerController {
private final CustomerServiceAdapter customerServiceAdapter;
CustomerController(CustomerServiceAdapter customerServiceAdapter) {
this.customerServiceAdapter = customerServiceAdapter;
}
@GetMapping("/customers/{id}")
Mono getCustomer(@PathVariable String id) {
return customerServiceAdapter.getCustomer(id);
}
@GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomers() {
return customerServiceAdapter.getCustomers(getRandomIds(10));
}
@GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomersChannel() {
return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))
.map(id -> new CustomerRequest(UUID.randomUUID().toString())));
}
private List getRandomIds(int amount) {
return IntStream.range(0, amount)
.mapToObj(n -> UUID.randomUUID().toString())
.collect(toList());
}
}
要使用上述 REST 接口,可以使用以下 curl
命令:
curl http://localhost:8080/customers/1
curl http://localhost:8080/customers
curl http://localhost:8080/customers-channel
Spring Boot 集成和 RPC 模块的是 RSocket 之上的补充解决方案。第一个是面向消息传递的,并提供了方便的消息路由 API,而 RPC 模块使开发人员可以轻松控制暴露的端口并维护微服务之间的契约。这两种解决方案都有应用场景,可以轻松地与 RSocket 底层 API 结合使用单一协议以一致的方式满足最复杂的要求。
系列总结
本文是与 RSocket 有关的微型系列的最后一部分,RSocket 是一种新的二进制协议,可以彻底改变云中服务之间的通信。其丰富的,性能和其他功能,例如,使其成为几乎所有可能的业务案例的理想选择。RSocket 的使用还可以通过可用的抽象层进行简化:Spring Boot 集成和 RPC 模块——它们可以解决最典型的日常场景。请注意,RSocket 处于候选版本(1.0.0-RC2)中,因此不建议在生产环境中使用该协议。尽管如此,还是应当保持关注的,因为不断增长的社区和大型科技公司(例如 Netflix,Facebook,阿里巴巴,Netifi)的支持可能会使 RSocket 成为云中的主要通信协议。
原文:https://dzone.com/articles/reactive-service-to-service-communication-with-rso-3
近期热文
喜欢本文的朋友们,欢迎长按下图关注订阅号涤生的博客,收看更多精彩内容
以上是关于用 RSocket 解决响应式服务之间的的通讯-Part 3:基于 RSocket 进行抽象的主要内容,如果未能解决你的问题,请参考以下文章
NetfiFacebook阿里等公司共同力推“响应式编程”技术:Rsocket