grpc双向流究竟是什么情况?2段代码告诉你

Posted 华为云开发者社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了grpc双向流究竟是什么情况?2段代码告诉你相关的知识,希望对你有一定的参考价值。

本文分享自华为云社区《grpc双向流究竟是什么情况?2段代码告诉你》,作者:breakDawn。

为什么需要grpc双向流?

有时候请求调用和返回过程,并不是简单的一问一答形式,可能会涉及一次发送,多次分批返回,或者两边随意互相发送。


因此简单的restful模型无法满足上述常见,grpc双向流应运而生,通过一个tpc链接实现了双向的异步IO通信

grpc双向流

一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。
两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:
比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。

  • 可以理解为常见IO模型里的异步IO的使用

每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。

  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) 

客户端的双向流调用

  1. 定义一个reponseOberserver, 即响应观察者,用于定义如何处理服务端返回的消息。 一般都是把消息放到一个某个阻塞队列或者单容量队列SettableFuture中。

  2. 调用stub.sendMessage(reponseOberserver), 即告诉grpc框架, 我要用这个reponseOberserver去处理sendMessage消息的响应。
    注意,这个sendMesage方法名,取决于我们的proto中怎么定义的。

  3. 然后stub.sendMessage()方法回返回给我们一个requestObserver,让我们用这个观察者.onNext()去发送请求,可以任意发多次,都是立刻返回的。

  4. 当不需要再发送时,可以调用onCompleted告知对方可以结束了
    下面是官网摘抄的代码示例:

  public void routeChat() throws Exception 
    info("*** RoutChat");
    final SettableFuture<Void> finishFuture = SettableFuture.create();
    // 定义了如何处理收到的返回消息观察者
	StreamObserver reponseObserver = new StreamObserver<RouteNote>() 
          @Override
          public void onNext(RouteNote note) 
            info("Got message \\"0\\" at 1, 2", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          

          @Override
          public void onError(Throwable t) 
            finishFuture.setException(t);
          

          @Override
          public void onCompleted() 
			// 往finishFuture设置空时,说明完成了消息流关闭了
            finishFuture.set(null);
          
        ;

    // 框架返回给我一个请求流观察者,让我用这个观察者.onNext(message)去发请求, 返回结果和我传给他的responseServer绑定了。
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat();

    try 
      RouteNote[] requests =
          newNote("First message", 0, 0), newNote("Second message", 0, 1),
              newNote("Third message", 1, 0), newNote("Fourth message", 1, 1);

      for (RouteNote request : requests) 
        info("Sending message \\"0\\" at 1, 2", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      
      requestObserver.onCompleted();

      finishFuture.get();
      info("Finished RouteChat");
     catch (Exception t) 
      requestObserver.onError(t);
      logger.log(Level.WARNING, "RouteChat Failed", t);
      throw t;
    
  

服务端的处理方式:

  1. 我们建立服务端的时候,需要调用nettyServer,建立netty服务,并绑定一个xxxServiceImpl抽象类。 这个xxxServiceImpl就是我们在proto中定义的server结构,支持处理我们定义的消息。

  2. xxxServiceImpl中, 有很多需要覆写的方法, 需要你定义如何处理收到的请求, 以及如何给客户端发送响应。发送响应的动作就是参数里的requestObserver.onNext(响应消息)

  3. 返回的xxxService类,会在第一步提供给netty以及grpc框架, 收到消息时,会通过他的异步机制,分隔网络线程和业务线程,走到这边执行的地方。
    下面是官网摘抄的代码示例:

class	xxxService extend   xxxServiceImpl
     @Override
    public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) 
      int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
      int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
      int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
      int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

      for (Feature feature : features) 
        if (!RouteGuideUtil.exists(feature)) 
          continue;
        

        int lat = feature.getLocation().getLatitude();
        int lon = feature.getLocation().getLongitude();
        if (lon >= left && lon <= right && lat >= bottom && lat <= top) 
          responseObserver.onNext(feature);
        
      
      responseObserver.onCompleted();
    

点击关注,第一时间了解华为云新鲜技术~​

以上是关于grpc双向流究竟是什么情况?2段代码告诉你的主要内容,如果未能解决你的问题,请参考以下文章

扩展 gRPC 双向流式聊天服务

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南

angular 数据双向绑定的终极奥义

gRPC双向数据流的交互控制(go语言实现)