java版gRPC实战之五:双向流

Posted 程序员欣宸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java版gRPC实战之五:双向流相关的知识,希望对你有一定的参考价值。

欢迎访问我的GitHub

《java版gRPC实战》全系列链接

  1. 用proto生成代码
  2. 服务发布和调用
  3. 服务端流
  4. 客户端流
  5. 双向流
  6. 客户端动态获取服务端地址
  7. 基于eureka的注册发现

本篇概览

  • 本文是《java版gRPC实战》系列的第五篇,目标是掌握双向流类型的服务,即请求参数是流的形式,响应的内容也是流的形式;
  • 先来看看官方资料对双向流式RPC的介绍:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;
  • 掌握了客户端流和服务端流两种类型的开发后,双向流类型就很好理解了,就是之前两种类型的结合体,请求和响应都按照流的方式处理即可;
  • 今天的实战,咱们来设计一个在线商城的功能:批量减扣库存,即客户端提交多个商品和数量,服务端返回每个商品减扣库存成功和失败的情况;
  • 咱们尽快进入编码环节吧,具体内容如下:
    1. 在proto文件中定义双向流类型的gRPC接口,再通过proto生成java代码
    2. 开发服务端应用
    3. 开发客户端应用
    4. 验证

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在==grpc-tutorials==文件夹下,如下图红框所示:
  • ==grpc-tutorials==文件夹下有多个目录,本篇文章对应的服务端代码在==double-stream-server-side==目录下,客户端代码在==double-stream-client-side==目录下,如下图:

在proto文件中定义双向流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开==mall.proto==,在里面新增方法和相关的数据结构,需要重点关注的是BatchDeduct方法的入参ProductOrder和返回值DeductReply都添加了==stream==修饰(ProductOrder是上一章定义的),代表该方法是双向流类型:
    
    // gRPC服务,这是个在线商城的库存服务
    service StockService 
    // 双向流式:批量扣减库存
    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) 
    

// 扣减库存返回结果的数据结构
message DeductReply
// 返回码
int32 code = 1;
// 描述信息
string message = 2;

- 双击下图红框中的task即可生成java代码:
![在这里插入图片描述](https://s4.51cto.com/images/blog/202112/29083016_61cbac18d4ede43729.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
- 生成下图红框中的文件,即服务端定义和返回值数据结构:
![在这里插入图片描述](https://s4.51cto.com/images/blog/202112/29083016_61cbac18cb37954363.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
- 接下来开发服务端;
### 开发服务端应用
- 在父工程grpc-turtorials下面新建名为==double-stream-server-side==的模块,其build.gradle内容如下:
```groovy
// 使用springboot插件
plugins 
    id org.springframework.boot


dependencies 
    implementation org.projectlombok:lombok
    implementation org.springframework.boot:spring-boot-starter
    // 作为gRPC服务提供方,需要用到此库
    implementation net.devh:grpc-server-spring-boot-starter
    // 依赖自动生成源码的工程
    implementation project(:grpc-lib)
    // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
    annotationProcessor org.projectlombok:lombok
  • 配置文件application.yml:
    spring:
    application:
    name: double-stream-server-side
    # gRPC有关的配置,这里只需要配置服务端口号
    grpc:
    server:
    port: 9901
  • 启动类DoubleStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 重点是提供grpc服务的GrpcServerService.java,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了,由于请求参数是流,因此匿名类的onNext会被多次调用,并且由于返回值是流,因此onNext中调用了==responseObserver.onNext==方法来响应流中的每个请求,这样客户端就不断收到服务端的响应数据(即客户端的onNext方法会被多次调用):
    
    package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcServicebr/>@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase

@Override
public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) 
    // 返回匿名类,给上层框架使用
    return new StreamObserver<ProductOrder>() 

        private int totalCount = 0;

        @Override
        public void onNext(ProductOrder value) 
            log.info("正在处理商品[],数量为[]",
                    value.getProductId(),
                    value.getNumber());

            // 增加总量
            totalCount += value.getNumber();

            int code;
            String message;

            // 假设单数的都有库存不足的问题
            if (0 == value.getNumber() % 2) 
                code = 10000;
                message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());
             else 
                code = 10001;
                message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());
            

            responseObserver.onNext(DeductReply.newBuilder()
                    .setCode(code)
                    .setMessage(message)
                    .build());
        

        @Override
        public void onError(Throwable t) 
            log.error("批量减扣库存异常", t);
        

        @Override
        public void onCompleted() 
            log.info("批量减扣库存完成,共计[]件商品", totalCount);
            responseObserver.onCompleted();
        
    ;

### 开发客户端应用
- 在父工程grpc-turtorials下面新建名为==double-stream-server-side==的模块,其build.gradle内容如下:
```groovy
plugins 
    id org.springframework.boot


dependencies 
    implementation org.projectlombok:lombok
    implementation org.springframework.boot:spring-boot-starter
    implementation org.springframework.boot:spring-boot-starter-web
    implementation net.devh:grpc-client-spring-boot-starter
    implementation project(:grpc-lib)
  • 配置文件application.yml,设置自己的web端口号和服务端地址:
    
    server:
    port: 8082
    spring:
    application:
    name: double-stream-client-side

grpc:
client:

gRPC配置的名字,GrpcClient注解会用到

double-stream-server-side:
  # gRPC服务端地址
  address: static://127.0.0.1:9901
  enableKeepAlive: true
  keepAliveWithoutCalls: true
  negotiationType: plaintext
- 启动类DoubleStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;
- 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增==getExtra==方法可以返回String对象,详细的用法稍后会看到:
```java
package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> 
    String getExtra();
  • 重头戏来了,看看如何远程调用双向流类型的gRPC接口,代码中已经添加详细注释:
    
    package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Servicebr/>@Slf4j
public class GrpcClientService

@GrpcClient("double-stream-server-side")
private StockServiceGrpc.StockServiceStub stockServiceStub;

/**
 * 批量减库存
 * @param count
 * @return
 */
public String batchDeduct(int count) 

    CountDownLatch countDownLatch = new CountDownLatch(1);

    // responseObserver的onNext和onCompleted会在另一个线程中被执行,
    // ExtendResponseObserver继承自StreamObserver
    ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() 

        // 用stringBuilder保存所有来自服务端的响应
        private StringBuilder stringBuilder = new StringBuilder();

        @Override
        public String getExtra() 
            return stringBuilder.toString();
        

        /**
         * 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,
         * 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容
         * @param value
         */
        @Override
        public void onNext(DeductReply value) 
            log.info("batch deduct on next");
            // 放入匿名类的成员变量中
            stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
        

        @Override
        public void onError(Throwable t) 
            log.error("batch deduct gRPC request error", t);
            stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
            countDownLatch.countDown();
        

        /**
         * 服务端确认响应完成后,这里的onCompleted方法会被调用
         */
        @Override
        public void onCompleted() 
            log.info("batch deduct on complete");
            // 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
            // 会继续往下执行
            countDownLatch.countDown();
        
    ;

    // 远程调用,此时数据还没有给到服务端
    StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);

    for(int i=0; i<count; i++) 
        // 每次执行onNext都会发送一笔数据到服务端,
        // 服务端的onNext方法都会被执行一次
        requestObserver.onNext(build(101 + i, 1 + i));
    

    // 客户端告诉服务端:数据已经发完了
    requestObserver.onCompleted();

    try 
        // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
        // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
        // await的超时时间设置为2秒
        countDownLatch.await(2, TimeUnit.SECONDS);
     catch (InterruptedException e) 
        log.error("countDownLatch await error", e);
    

    log.info("service finish");
    // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
    return responseObserver.getExtra();


/**
 * 创建ProductOrder对象
 * @param productId
 * @param num
 * @return
 */
private static ProductOrder build(int productId, int num) 
    return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();

- 最后做个web接口,可以通过web请求验证远程调用:
```java
package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController 

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) 
        return grpcClientService.batchDeduct(count);
    

欢迎关注公众号:程序员欣宸

以上是关于java版gRPC实战之五:双向流的主要内容,如果未能解决你的问题,请参考以下文章

java版gRPC实战之六:客户端动态获取服务端地址

java版gRPC实战之七:基于eureka的注册发现

java版gRPC实战之二:服务发布和调用

java版gRPC实战之一:用proto生成代码

grpc双向流究竟是什么情况?2段代码告诉你

java版gRPC实战之三:服务端流