Dubbo3终极特性「流式协议体系」Dubbo3服务的流式处理和响应式编程实战开发指南

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo3终极特性「流式协议体系」Dubbo3服务的流式处理和响应式编程实战开发指南相关的知识,希望对你有一定的参考价值。

Streaming通信模式

流的语义保证

  • 提供消息边界,可以方便地对消息单独处理
  • 严格有序,发送端的顺序和接收端顺序一致
  • 全双工,发送不需要等待
  • 支持取消和超时

Triple新特性之Stream (流式处理)

Stream流式处理是Dubbo3新提供的一种调用类型,在以下场景时建议使用流的方式:

  • 接口需要发送大量数据,这些数据无法被放在一个RPC的请求或响应中,需要分批发送,但应用层如果按照传统的多次RPC方式无法解决顺序和性能的问题,如果需要保证有序,则只能串行发送
  • 流式场景,数据需要按照发送顺序处理, 数据本身是没有确定边界的。
  • 推送类场景,多个消息在同一个调用的上下文中被发送和处理。

Stream分为以下三种

SERVER_STREAM(服务端流)

服务端的流式处理主要是通过一次请求,服务端将大批量的数据一批一批以的方式传递给客户端,此时客户端只需要进行一次请求,接口就可以得到多次相应操作。从而减少了很多的客户端的请求以及减少对应的网络的拥堵可能。

CLIENT_STREAM(客户端流)

客户端的流式处理主要是通过多次发送的请求,服务端可以对应多次客户端的请求,进行统一相应数据信息。从而减少了很多次的服务端的响应传输次数。

BIDIRECTIONAL_STREAM(双向流)

在Dubbo3 中,流式接口以 SteamObserver 声明和使用,用户可以通过使用和实现这个接口来发送和处理流的数据、异常和结束。但是由于java语言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的实现是一样的

对于Dubbo2 用户来说,可能会对StreamObserver感到陌生,这是Dubbo3定义的一种流类型,Dubbo2 中并不存在 Stream 的类型,所以对于迁移场景没有任何影响

Stream流式处理开发案例

配置Stream流式处理Maven依赖

<dependency>
       <groupId>org.apache.dubbo</groupId>
       <artifactId>dubbo</artifactId>
       <version>3.0.7</version>
</dependency>
<dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.19.4</version>
</dependency>
<dependency>
     <groupId>io.grpc</groupId>
     <artifactId>grpc-all</artifactId>
     <version>1.44.1</version>
</dependency>

定义数据模型

UserModel数据模型
@Data
public class UserModel 
    private Long id;
    private String name;
    private Integer age;

UserDO数据模型
@Data
public class UserDO 
    private Long id;
    private String name;
    private Integer age;

定义流式接口

package com.alibaba.alibabaserverplatform.api;
import com.alibaba.alibabaserverplatform.api.model.UserModel;
import com.alibaba.alibabaserverplatform.dao.dataobject.UserDO;
import com.google.api.client.json.rpc2.JsonRpcRequest;
import org.apache.dubbo.common.stream.StreamObserver;

public interface StreamDataRpcApi 

  /**
     * @param userModel 入参请求对象
     * @param response 出参对应结果
     */
    void serverProcess(UserModel userModel,StreamObserver<UserDO> response);

    /**
     * @param response 出参对应结果
     * @return 请求参数对象
     */
    StreamObserver<UserModel> clientAndDirectionProcess(StreamObserver<UserDO> response);



服务端流式接口

void serverProcess(UserModel userModel,StreamObserver<UserDO> response);

客户端流式接口

UserDO clientProcess(StreamObserver<UserModel> request);

Stream 方法的方法入参和返回值是严格约定的,为防止写错而导致问题,Dubbo3 框架侧做了对参数的检查, 如果出错则会抛出异常。

双向流式接口

对于双向流(BIDIRECTIONAL_STREAM), 需要注意参数中的 StreamObserver 是响应流,返回参数中的 StreamObserver 为请求流。

StreamObserver<UserModel> clientAndDirectionProcess(StreamObserver<UserDO> response)

实现流式接口

package com.alibaba.alibabaserverplatform.service;

import com.alibaba.alibabaserverplatform.api.StreamDataRpcApi;
import com.alibaba.alibabaserverplatform.api.model.UserModel;
import com.alibaba.alibabaserverplatform.dao.dataobject.UserDO;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.annotation.DubboService;

import java.util.List;

/**
 * @project-name:alibaba-cloud
 * @package-name:com.alibaba.alibabaserverplatform.service
 * @author:LiBo/Alex
 * @create-date:2023-01-12 21:13
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
@DubboService
public class DefaultStreamDataRpcApi implements StreamDataRpcApi 


    @Override
    public void serverProcess(UserModel userModel, StreamObserver<UserDO> response) 
        UserDO userDO = new UserDO();
        userDO.setName(userModel.getName());
        userDO.setAge(userModel.getAge());
        userDO.setId(userModel.getId());
        response.onNext(userDO);
    


    @Override
    public StreamObserver<UserModel> clientAndDirectionProcess(StreamObserver<UserDO> response) 
        return new StreamObserver<UserModel>()
            @Override
            public void onNext(UserModel userModel) 
                UserDO userDO = new UserDO();
                userDO.setName(userModel.getName());
                userDO.setAge(userModel.getAge());
                userDO.setId(userModel.getId());
                response.onNext(userDO);
            

            @Override
            public void onError(Throwable throwable) 
                System.out.println(throwable);
            

            @Override
            public void onCompleted() 
                response.onCompleted();
                System.out.println("completed");
            
        ;
    

以上是关于Dubbo3终极特性「流式协议体系」Dubbo3服务的流式处理和响应式编程实战开发指南的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo3 终极特性「云原生三中心架构」带你探索 Dubbo3 体系下的配置中心和元数据中心注册中心的原理及开发实战(中)

Dubbo3终极特性「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin

Dubbo3终极特性「请求流治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin

Dubbo3终极特性「云原生三中心架构」带你探索Dubbo3体系下的配置中心和元数据中心注册中心的原理及开发实战(上)

Dubbo3终极特性「云原生三中心架构」带你探索Dubbo3体系下的配置中心和元数据中心注册中心的原理及开发实战(上)

Dubbo3终极特性「云原生体系」Kubernetes生命周期对齐探针的扩展与应用实战