gRPC 流式调用

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gRPC 流式调用相关的知识,希望对你有一定的参考价值。

gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:

类型 说明
简单 RPC 客户端传入一个请求对象,服务端返回一个结果对象
客户端流式 RPC 客户端传入多个请求对象,服务端返回一个结果对象
服务端流式 RPC 客户端传入一个请求对象,服务端返回多个结果对象
双向流式 RPC 客户端传入多个请求对象,服务端返回多个结果对象

RPC 定义

简单 RPC:一般这种方式使用较多,如下:定义 SayHello 方法,输入 HelloRequest,返回 HelloResponse 。

1
2
3
4
5
6
7
8
9
10
11
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream 关键词,如下:

1
2
3
4
5
6
7
8
service HelloService {
// 客户端流式 RPC
rpc SayHello1 (stream HelloRequest) returns (HelloResponse);
// 服务端流式 RPC
rpc SayHello2 (HelloRequest) returns (stream HelloResponse);
// 双向流式 RPC
rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse);
}

gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。

使用场景

在 gRPC 中消息接收大小 GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH 默认是 4M,如果大于该值,则会提示:Error: grpc: received message larger than max (xxxxxx vs. 4194304),当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。

示例

这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package GrpcStream;

service StreamTest {
// 双向流程 RPC
rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}

message BidirectionalStreamRequest {
// 文件路径
string file_path = 1;
}
message BidirectionalStreamResponse {
// 文件路径
string file_path = 1;
// 数据
bytes data = 2;
}

代码实现

这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。

服务端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context)
{
var i = 0;
// 监听客户端数据输入
while (await requestStream.MoveNext())
{
// 打印次数
Console.WriteLine(i++);
using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open);
var leftSize = fs.Length;
// 1M
var buff = new byte[1048576];
while (leftSize > 0)
{
var len = await fs.ReadAsync(buff);
leftSize -= len;
Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes");
// 流式返回数据
await responseStream.WriteAsync(new BidirectionalStreamResponse
{
FilePath = requestStream.Current.FilePath,
Data = ByteString.CopyFrom(buff, 0, len)
});
}
}
}

客户端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 测试文件,key 是已存在的文件,value 是需要生成的文件
static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>()
{
{@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" },
{@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" },
};
static StreamTest.StreamTestClient client;

static async Task Main(string[] args)
{
// 连接 gRPC 服务
var channel = GrpcChannel.ForAddress("https://localhost:5001");
client = new StreamTest.StreamTestClient(channel);
await BidirectionalStreamTestAsync();
Console.ReadKey();
}

static async Task BidirectionalStreamTestAsync()
{
using var call = client.BidirectionalStream();
var responseTask = Task.Run(async () =>
{
// 接收返回值
var iterator = call.ResponseStream;
// 监听服务端数据返回
while (await iterator.MoveNext())
{
Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes");
// 写入新文件
using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append);
iterator.Current.Data.WriteTo(fs);
}
});

var rand = new Random();
foreach (var item in fileDic)
{
// 流式输入
await call.RequestStream.WriteAsync(new BidirectionalStreamRequest
{
FilePath = item.Key
});
await Task.Delay(rand.Next(200));
}
await call.RequestStream.CompleteAsync();
await responseTask;
}

执行结果:

参考资料

  • gRPC Concepts


以上是关于gRPC 流式调用的主要内容,如果未能解决你的问题,请参考以下文章

grpc应用之二 gRPC使用

互联网协议 — gRPC 的 4 种服务定义及调用方式

gRPC之流式调用原理http2协议分析

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南