gRPC 流式调用 Posted 2021-04-11 dotNET跨平台
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gRPC 流式调用相关的知识,希望对你有一定的参考价值。
gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:
类型
说明
简单 RPC
客户端传入一个请求对象,服务端返回一个结果对象
客户端流式 RPC
客户端传入多个请求对象,服务端返回一个结果对象
服务端流式 RPC
客户端传入一个请求对象,服务端返回多个结果对象
双向流式 RPC
客户端传入多个请求对象,服务端返回多个结果对象
RPC 定义
简单 RPC:一般这种方式使用较多,如下:定义 SayHello
方法,输入 HelloRequest
,返回 HelloResponse
。
1 2 3 4 5 6 7 8 9 10 11
service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse); } message HelloRequest { string greeting = 1; } message HelloResponse { string reply = 1; }
而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream
关键词,如下:
1 2 3 4 5 6 7 8
service HelloService { // 客户端流式 RPC rpc SayHello1 (stream HelloRequest) returns (HelloResponse); // 服务端流式 RPC rpc SayHello2 (HelloRequest) returns (stream HelloResponse); // 双向流式 RPC rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse); }
gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。
使用场景
在 gRPC 中消息接收大小 GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH
默认是 4M,如果大于该值,则会提示:Error: grpc: received message larger than max (xxxxxx vs. 4194304)
,当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。
示例
这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。
接口定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
syntax = "proto3"; package GrpcStream; service StreamTest { // 双向流程 RPC rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {} } message BidirectionalStreamRequest { // 文件路径 string file_path = 1; } message BidirectionalStreamResponse { // 文件路径 string file_path = 1; // 数据 bytes data = 2; }
代码实现
这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。
服务端代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context) { var i = 0; // 监听客户端数据输入 while (await requestStream.MoveNext()) { // 打印次数 Console.WriteLine(i++); using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open); var leftSize = fs.Length; // 1M var buff = new byte[1048576]; while (leftSize > 0) { var len = await fs.ReadAsync(buff); leftSize -= len; Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes"); // 流式返回数据 await responseStream.WriteAsync(new BidirectionalStreamResponse { FilePath = requestStream.Current.FilePath, Data = ByteString.CopyFrom(buff, 0, len) }); } } }
客户端代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
// 测试文件,key 是已存在的文件,value 是需要生成的文件 static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>() { {@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" }, {@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" }, }; static StreamTest.StreamTestClient client; static async Task Main(string[] args) { // 连接 gRPC 服务 var channel = GrpcChannel.ForAddress("https://localhost:5001"); client = new StreamTest.StreamTestClient(channel); await BidirectionalStreamTestAsync(); Console.ReadKey(); } static async Task BidirectionalStreamTestAsync() { using var call = client.BidirectionalStream(); var responseTask = Task.Run(async () => { // 接收返回值 var iterator = call.ResponseStream; // 监听服务端数据返回 while (await iterator.MoveNext()) { Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes"); // 写入新文件 using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append); iterator.Current.Data.WriteTo(fs); } }); var rand = new Random(); foreach (var item in fileDic) { // 流式输入 await call.RequestStream.WriteAsync(new BidirectionalStreamRequest { FilePath = item.Key }); await Task.Delay(rand.Next(200)); } await call.RequestStream.CompleteAsync(); await responseTask; }
执行结果:
参考资料
以上是关于gRPC 流式调用的主要内容,如果未能解决你的问题,请参考以下文章
grpc应用之二 gRPC使用
互联网协议 — gRPC 的 4 种服务定义及调用方式
gRPC之流式调用原理http2协议分析
gRPC 流式传输极简入门指南
gRPC 流式传输极简入门指南
gRPC 流式传输极简入门指南