Grpc Streaming 你造?

Posted 技术原始积累

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Grpc Streaming 你造?相关的知识,希望对你有一定的参考价值。

一、前言

grpc 是一个由 google 推出的、高性能、开源、通用的 rpc 框架。它是基于 HTTP2 协议标准设计开发,默认采用 Protocol Buffers 数据序列化协议,支持多种开发语言。

一般业务场景下,我们都是使用grpc的simple-rpc模式,也就是每次客户端发起请求,服务端会返回一个响应结果的模式。

但是grpc除了这种一来一往的请求模式外,还有流式模式,下面我们一一道来。

二 grpc服务端流

服务端流模式是说客户端发起一次请求后,服务端在接受到请求后,可以以流的方式,使用同一连接,不断的向客户端写回响应结果,客户端则可以源源不断的接受到服务端写回的数据。Grpc Streaming 你造?

下面我们通过简单例子,来说明如何使用,服务端端流。要实现服务端流,需要把grpc方法定义如下:

message Metric { google.protobuf.Timestamp timestamp = 1; int64 metric = 2;}
message Average { double val = 1;}
service MetricsService { rpc collectServerStream (Metric) returns (stream Average);}

如上rpc方法的返回值类型前添加stream标识 是服务端流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
public StreamObserver<StreamingExample.Average> responseObserverT; /** * 服务端流 * * @param request * @param responseObserver */ @Override public void collectServerStream(com.example.server.streaming.StreamingExample.Metric request, io.grpc.stub.StreamObserver<com.example.server.streaming.StreamingExample.Average> responseObserver) { //保存流式响应对象 this.responseObserverT = responseObserver; }

最后启动服务,并当流式对象不为null时候,写回数据到客户端:

public class MetricsServerServerStream { public static void main(String[] args) throws InterruptedException, IOException { //启动服务 MetricsServiceImpl metricsService = new MetricsServiceImpl(); Server server = ServerBuilder.forPort(8080).addService(metricsService).build(); server.start();
//获取steam响应对象,不断的向客户端写回数据 new Thread(new Runnable() { @Override public void run() { for (; ; ) { if (null != metricsService.responseObserverT) { metricsService.responseObserverT.onNext(StreamingExample.Average.newBuilder() .setVal(new Random(1000).nextDouble()) .build()); System.out.println("send to client"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }
} }).start(); server.awaitTermination(); }}


下面我们看客户端代码,客户端代码如下:

public class MetricsClientServerStream { public static void main(String[] args) throws InterruptedException { //获取客户端桩对象 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//发起rpc请求,设置StreamObserver用于监听服务器返回结果 stub.collectServerStream(StreamingExample.Metric.newBuilder().setMetric(1L).build(), new StreamObserver<StreamingExample.Average>() { @Override public void onNext(StreamingExample.Average value) { System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal()); }
@Override public void onError(Throwable t) { System.out.println("error:" + t.getLocalizedMessage()); }
@Override public void onCompleted() { System.out.println("onCompleted:");
} });}


如上启动客户端后,可以看到StreamObserver的onNext方法会源源不断的接受到服务端返回的数据。

服务端流使用场景:

  • 客户端请求一次,但是需要服务端源源不断的返回大量数据时候,比如大批量数据查询的场景。

  • 比如客户端订阅服务端的一个服务数据,服务端发现有新数据时,源源不断的吧数据推送给客户端。

三 grpc客户端流

客户端流模式是说客户端发起请求与服务端建立链接后,可以使用同一连接,不断的向服务端传送数据,等客户端把全部数据都传送完毕后,服务端才返回一个请求结果。

下面我们通过简单例子,来说明如何使用,客户端流。要实现服务端流,需要把grpc方法定义如下:

service MetricsService { rpc collectClientStream (stream Metric) returns (Average);}

如上rpc方法的入参类型前添加stream标识 是客户端流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
/** * 客户端流 * * @param responseObserver * @return */ @Override public StreamObserver<StreamingExample.Metric> collectClientStream(StreamObserver<StreamingExample.Average> responseObserver) { return new StreamObserver<StreamingExample.Metric>() { private long sum = 0; private long count = 0;
@Override public void onNext(StreamingExample.Metric value) { System.out.println("value: " + value); sum += value.getMetric(); count++; }
@Override public void onError(Throwable t) { System.out.println("severError:" + t.getLocalizedMessage()); responseObserver.onError(t); }
@Override public void onCompleted() { responseObserver.onNext(StreamingExample.Average.newBuilder() .setVal(sum / count) .build()); System.out.println("serverComplete: ");
} }; }


如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。

下面我们看客户端代码,客户端代码如下:

public class MetricsClient2 { public static void main(String[] args) throws InterruptedException { //1.创建客户端桩 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//2.发起请求,并设置结果回调监听 StreamObserver<StreamingExample.Metric> collect = stub.collectClientStream(new StreamObserver<StreamingExample.Average>() { @Override public void onNext(StreamingExample.Average value) { System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal()); }
@Override public void onError(Throwable t) { System.out.println("error:" + t.getLocalizedMessage()); }
@Override public void onCompleted() { System.out.println("onCompleted:");
} });
//3.使用同一个链接,不断向服务端传送数据 Stream.of(1L, 2L, 3L, 4L,5L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build()) .forEach(metric -> { collect.onNext(metric); System.out.println(metric); });
Thread.sleep(3000); collect.onCompleted(); channel.shutdown().awaitTermination(50, TimeUnit.SECONDS); }}

如上启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端,然后等服务端结束完毕数据后,会计算接受到的数据的平均值,然后把平均值写回客户端。然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值3。

客户端流使用场景:

  • 比如数据批量计算场景:如果只用simple rpc的话,服务端就要一次性收到大量数据,并且在收到全部数据之后才能对数据进行计算处理。如果用客户端流 rpc的话,服务端可以在收到一些记录之后就开始处理,也更有实时性。

四 grpc双向流

双向流意味着客户端向服务端发起请求后,客户端可以源源不断向服务端写入数据的同时,服务端可以源源不断向客户端写入数据。

下面我们通过简单例子,来说明如何使用双向流。要实现双向流,需要把grpc方法定义如下:

service MetricsService { rpc collectTwoWayStream (stream Metric) returns (stream Average);}

如上rpc方法的入参和返回类型前添加stream标识 是双向流,然后服务端实现代码如下:

public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {

public StreamObserver<StreamingExample.Average> responseObserverT;
/** * 双向流 * * @param responseObserver * @return */ @Override public StreamObserver<StreamingExample.Metric> collectTwoWayStream(StreamObserver<StreamingExample.Average> responseObserver) { this.responseObserverT = responseObserver; return new StreamObserver<StreamingExample.Metric>() { private long sum = 0; private long count = 0;
@Override public void onNext(StreamingExample.Metric value) { System.out.println("value: " + value); sum += value.getMetric(); count++; }
@Override public void onError(Throwable t) { System.out.println("severError:" + t.getLocalizedMessage()); responseObserver.onError(t); }
@Override public void onCompleted() { responseObserver.onNext(StreamingExample.Average.newBuilder() .setVal(sum / count) .build()); System.out.println("serverComplete: ");
} }; }



如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。并且服务端保存了流式对象responseObserverT用来不断的写数据到客户端

双向流使用场景:

  • 需要双向数据交互的场景,比如聊天机器人,游戏室等。

五 StreamObserver转换为反应式框架流

StreamObserver是grpc自己定义的一个流式接口,其定义如下:

 
   
   
 
  1. public interface StreamObserver<V> {

  2. void onNext(V var1);


  3. void onError(Throwable var1);


  4. void onCompleted();

  5. }

grpc虽然提供了流式接口,但是其并没有提供便捷的流操作符,而我们知道Reactor或者Rxjava这些反应式编程框架,本身是提供了丰富便捷的流操作符的。所以我们想看看如何把StreamObserver转换为反应式框架流,由于Reactor是spring5自带的,所以我们看看如何把StreamObserver转换为Reactor的Flux流对象。

转换代码如下:

public class StreamObserverPublisher implements Publisher<StreamingExample.Average>, StreamObserver<StreamingExample.Average> {
private Subscriber<? super StreamingExample.Average> subscriber;
@Override public void onNext(StreamingExample.Average l) { subscriber.onNext(l); }
@Override public void onError(Throwable throwable) { subscriber.onError(throwable); }
@Override public void onCompleted() { subscriber.onComplete(); }
@Override public void subscribe(Subscriber<? super StreamingExample.Average> subscriber) { this.subscriber = subscriber; this.subscriber.onSubscribe(new BaseSubscriber() { }); }}


public class MetricsClientTwoWay { public static void main(String[] args) throws InterruptedException { //创建客户端桩 ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build(); MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//转换StreamObserver流为Flux流 StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher(); Flux<StreamingExample.Average> flux = Flux.from(streamObserverPublisher); //订阅流,缓存,并消费 flux.buffer(4).subscribe(o -> System.out.println("ele:" + o.size())); // must be done before executing the gRPC request
//发起rpc请求 StreamObserver<StreamingExample.Metric> collect = stub.collectTwoWayStream(streamObserverPublisher);}


六 总结

grpc除了提供了simple-rpc还提供了双向流操作,大家可以结合自己的业务场景,选择性使用。另外为了使用反应式框架丰富的流操作符,我们可以便捷的把StreamObserver流转换为Flux流。

另外流式grpc结合伪流式db组件  可以起到意想不到的效果


戳下面阅读

以上是关于Grpc Streaming 你造?的主要内容,如果未能解决你的问题,请参考以下文章

gRPC (Cpp) Streaming - 如果在 grpc_impl::ServerReaderWriter::Read 期间一侧挂起或忘记关闭流,会发生啥?

带入gRPC:gRPC Streaming, Client and Server

gRPC-Web中的拦截器

PICE:MongoDBStreaming - gRPC -MGO Service

哈哈,你造原来程序猿这么多长处嘛

javascript基础修炼(12)——手把手教你造一个简易的require.js