gRPC Java 客户端能否通过长期存在的 gRPC 流并行发送多个请求以及如何管理 N 个流

Posted

技术标签:

【中文标题】gRPC Java 客户端能否通过长期存在的 gRPC 流并行发送多个请求以及如何管理 N 个流【英文标题】:Can gRPC Java client send multiple requests in parallel over a long lived gRPC stream and how to manage N streams 【发布时间】:2021-11-11 16:44:46 【问题描述】:

我正在使用“流式 RPC”API,其中 MyRequests 和 MyResponse 都是流式传输的

service MyStreamedService 
  rpc myOperation(**stream** MyRequest) returns (**stream** MyResponse)

这是一个稍微简化的类,它封装了一个 gRPC 流;

public class MyStreamWrapper implements StreamObserver<MyResponse> 
  public MyStreamWrapper(ManagedChannel myChannel) 
    myStub = MyStreamedServiceGrpc.newStub(myChannel);
    // create a stream and maintain a long lived reference to the stream via StreamObserver's
    myStream = myStub.myOperation(this);
  
  
  @Override
  public void onNext(MyResponse r) 
    // handle the response (not shown)
  
  
  @Override
  public void onError(Throwable t) 
    // very unfortunate that there is no error code in this API !
    // throttle (not shown but if I don't throttle, eats CPU)
    // Create a new stream
    myStream = myStub.myOperation(this);
  
  
  @Override
  public void onCompleted() 
    // server has called StreamObserver<MyRequest>.onCompleted
    // Create a new stream using the async API
    myStream = myStub.myOperation(this);
  
  
  // Context: many threada that want to send a request asynchronously
  public void send(MyRequest r) 
    synchronized(myStream) 
      myStream.onNext(r);
    
  

问题

    为什么访问myStream需要在send方法中同步?我想了解为什么我必须同步想要在同一流上并行发送无序请求的线程。如果每个请求都打包在带有自己的 stream-id 的 HTTP2 DATA 帧中,那么这只是 gRPC 客户端的 Java 实现所独有的吗? 当线程从 send 方法返回时,保证会发生什么?
请求在 gRPC 客户端中缓冲...我认为这必须是最低限度 请求作为 HTTP2 流帧在线上? 代理已收到请求? 服务器已收到但未处理? 服务器已收到并发送响应?
    鉴于客户端线程一次同步到一次 onNext 调用,客户端是否可以使服务器超载或通过在上述发送方法中阻塞客户端线程来施加背压?我在流量下看到 Streams 正在关闭,并且出现类似“INTERNAL: RST_STREAM closed stream.HTTP/2 error code: PROTOCOL_ERROR”之类的错误。 鉴于创建流的成本很低,维护和重用 myStream 是否不寻常? 鉴于只能将流定向到一台服务器,我认为我需要向上面的简单类添加更多代码以便在创建通道时创建 N myStream 然后循环发送方法N 个 myStream 中的每一个。不幸的是,没有 API 可以确定 myStream 当前是否正忙于处理另一个 RPC 请求。或者,我可以动态创建新的流并添加一个信号量(size:N) 来限制尝试使用它们的线程数。

我认为我理解的事情......

gRPC 通道是子通道的集合,gRPC 负责创建从客户端通过不同子通道到每个服务器的网络连接 gRPC 流通过单个子通道连接到每个服务器,并按顺序处理请求。 gRPC 流基于 HTTP2 流,它是一系列具有相同流 ID 的帧。 我还看到一个 gRPC 流被描述为单个 gRPC 调用,并且理解为每个请求使用一个新流很便宜,但在我上面的工作示例中,它也可以长期存在(或者至少 gRPC Java客户端提供了一个让它看起来很长寿的对象) “gRPC 的核心是流式传输。一元(单个请求,单个响应)和服务器流式处理(单个请求)只是用于生成更清晰的 API 或更优化的 I/O 行为的特殊情况。但是在线上,一切看起来都和流媒体一样。” Eric Anderson 在How is gRPC client streaming implemented 中的回答帮助我理解了为什么一元 API 在服务器端包含一个 StreamObserver。

【问题讨论】:

澄清:在上面的问题中,“请求”表示我的自定义 POJO MyRequest 的一个实例。但是我现在在datatracker.ietf.org/doc/html/rfc7540#section-8.1 中看到,HTTP/2 流请求可能涉及许多帧 HEADERS、DATA、DATA、END_STREAM 等 HTTP/2 请求可能存在很长时间(实际上是在流的持续时间内)并在 DATA 帧中包含许多 MyRequest。 请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。 @Community 请为上述机器人的 AI 提供更好的训练数据,以便为其烦人的言论仅识别足够的帖子;-] 【参考方案1】:

事实证明,所有答案都可以在 HTTP/2 规范 https://datatracker.ietf.org/doc/html/rfc7540# 中找到,这非常易读。

    Eric Anderson 回答“请注意,该对象不是线程安全的,因此如果您同时从多个线程调用它的方法,您应该进行自己的锁定/同步”。并没有真正解释为什么,我猜 Unary API 不需要同步,所以它们没有为 Streaming API 同步 应用程序请求至少在一个缓冲区中,准备好在 HTTP DATA 帧中连接到网络上。线程不会被阻塞等待任何确认它已被接收。无论如何,数据帧都不会被确认。由于 HTTP2 流控制以防止远端过载,应用程序可能无法发送数据,但即使在这种情况下,线程似乎也不太可能被阻塞。 如上所述,流控制在https://datatracker.ietf.org/doc/html/rfc7540#section-6.9 中进行了描述。服务器授予客户端发送一定数量字节的权限,并且客户端必须在分配使用完毕后停止。 不,不寻常。 Here,Eric Anderson 说“对于完全异步处理,我们希望您在当前线程上启动任何 I/O,然后在 I/O 完成后从其他线程调用 StreamObserver 上的方法。”。与Java gRPC server for long-lived streams effective implementation 重复 根据上面 Eric Anderson 的回复,我认为缓存流引用并重新使用它是正常的。但是,我不知道为什么 Java 通道 API 不包含方法 int getStreamLimit() ,因为客户端需要知道根据在创建通道期间交换的 HTTP2 SETTINGS 消息中的 SETTINGS_MAX_CONCURRENT_STREAMS 值允许使用的许多流。

一般评论: 我发现 gRPC 文档很难理解,直到我意识到大多数更深层次的问题都可以通过研究 HTTP2 来回答 例如 HTTP2 请求与 proto IDL 中的应用程序请求对象不是 1:1 对应的。关键点:HTTP2 请求在流的持续时间内持续,并且可以携带许多应用程序请求对象。

【讨论】:

【参考方案2】:
    首先,HTTP/2 消息可能很大,因此在一般情况下,将 HTTP/2 帧放在网络上并不是原子的,因此多个线程可能同时向同一个网络套接字写入。此外,还有与流控制相关的缓冲:如果流的另一端尚未准备好接受更多消息,则它们需要在您端进行缓冲,因此多个线程可能会同时写入同一个缓冲区。 “请求在 gRPC 客户端中缓冲” 正确答案:StreamObserver 的方法是异步的,它们返回非常快,实际的网络通信将在某个时候由另一个线程执行。 发送消息时,您应该尊重对方的准备情况,使用CallStreamObserver:isReady() 和setOnReadyHandler(...) 中的方法(您始终可以将出站StreamObserver 转换为CallStreamObserver)。如果忽略对方的准备情况,gRPC 将在其内部缓冲区中缓冲消息,直到对方准备好。但是,由于第 1 点中描述的缓冲,这在某些情况下可能会导致内存使用过多。 顺便说一句:你可能想看看官方的 copyWithFlowControl() 辅助方法和我自己的 DispatchingOnReadyHandler 类。 我猜您的意图是始终打开 RPC:如果是这样,您的代码似乎没问题。然而问题是您是否应该使用单个双向调用与多个一元:如果服务器处理 1 个请求消息与其他请求消息的处理不紧密相关(即:服务器不需要不断维护单个,在- 与所有请求消息相关的内存状态),那么一元调用会更好,至少有两个原因: 4.1。不需要第 1 点中描述的同步。 4.2.您将更好地利用服务器负载平衡。 在正常情况下,启动一个新的一元 RPC 的开销很小,因为它将在 现有 HTTP/2 连接上打开一个新流。 但是,如果某些请求消息的服务器端处理可能与其他一些先前的请求消息有关,那么您确实需要客户端流。但是,您应该尽可能尝试关闭并更新 RPC,以允许服务器平衡流量。 如果您的客户端作为某个服务器应用程序的一部分运行,那么grpclb 策略是最常见的负载平衡选择:它将维护到多个可用后端的多个 HTTP/2 连接的集合,每个连接可能有多个HTTP/2 流(HTTP/2 流与 gRPC 流对应 1-1)。此外,grpclb 将主动探测这些连接以验证它们是否健康,并自动重新发出 DNS 查询(或任何其他名称解析服务,如果您使用自定义 NameResolver)以查看是否在需要时添加了任何新后端。如果您想使用它,请记住包含 grpc-grpclb 运行时依赖项。有关服务器应用程序到服务器应用程序案例中负载平衡和名称解析的更多信息,请参阅答案底部的注释。 如果您的客户端在 android 上运行,则 grpclb 无法正常使用(大多数 android 设备缺乏运行后台负载平衡器的功能,即使可能它也会很快耗尽设备的电池),但您的连接通常会经历一些负载-平衡器站在后端服务器前面。在这种情况下,每个新的 RPC 通常会转到占用最少的后端。 但是,由于您似乎只维护了 1 个长期通话,因此您的所有请求消息都将发送到同一个后端,直到通话被“更新”:这就是为什么我建议在前一点中尽可能使用一元通话。这比实现自己的负载平衡要简单得多,所以如果可能的话,它应该是首选。

关于“我理解的事情”部分的说明:“子通道”通常基本上是 HTTP/2 连接:一个通道是可能多个 HTTP/2 连接的集合(取决于客户端能力:见第 5 点)到多个后端(当然取决于服务器配置),每个连接可以有多个独立的流(每 1 个 gRPC 调用 1 个 HTTP/2 流)。

关于服务器应用到服务器应用 gRPC 的负载平衡和名称解析的几点说明:

客户端查找可用后端的最简单和最常用的方法是通过 DNS 解析。更复杂的机制包括 xDS 或 Consul 或自定义 NameResolvers 等。 特别是在同一个 k8s 集群中同时部署后端和客户端服务器应用时,最常见的方式是将后端部署为无头服务,以便客户端可以通过集群内获取所有后端 pod关于&lt;backed-service-name&gt;.&lt;k8s-namespace&gt;.svc.cluster.local 的DNS 查询。例如.forTarget("myGrpcBackendService.default.svc.cluster.local:6000")。 默认情况下,使用grpclb 的客户端将尝试尽可能长时间地保持其与后端的连接集,因此即使 RPC 的生命周期很短,底层连接也会坚持到同一组后端。为了强制执行定期重新平衡,服务器可以使用NettyServerBuilder 中的方法设置最长连接寿命:maxConnectionAge(...)、maxConnectionAgeGrace(...) 和maxConnectionIdle(...)。

【讨论】:

很棒的答案 - 非常感谢。回复:1-2。有道理,我现在正在寻找link Re:3 的答案。很棒的小费。鉴于我维护每个通道的 N 个流(也称为 N RPC)池,我一直在寻找一种方法让我的 N-stream LB 算法选择一个“健康”流。 Re:5 我设置了 ManagedChannelBuilder .defaultLoadBalancingPolicy("round_robin") 因为 Java 文档说默认策略是 "pick_first" Re:4 是的,我一直想打开一个 RPC。我有比 gRPC 客户端更多的 gRPC 服务器,所以这是一个 Linux 服务器到服务器的场景。我认为我需要使用流 API,因为尽管在同一流上发送的许多 MyRequest POJO 不相关,但其中一些是相关的,在这种情况下需要维护排序。我知道我的长寿流应该定期关闭以允许扩展服务器端。 PS我试图将您的答案标记为问题的已接受答案,但没有任何反应。 @DiarmuidLeonard 如果是 server2server,那么您绝对应该使用 grpclb LB 策略。在这样做之前,只需确保在运行时依赖项中包含grpc-grpclb:它会主动探测连接是否健康,并定期重新发出 DNS 查询以查看是否添加了新的后端(如果您的服务器作为无头服务部署在 k8s 上,然后客户端的 grpclb 将自动将任何新创建的后端 pod 添加到其列表中)。同样,如果您使用短期 RPC,这会更好,因为不能将长期存在的 RPC 移动到另一个后端。 @DiarmuidLeonard 关于相关请求对象:是的,这听起来是一个很好的理由。在这种情况下,我想到的唯一改进是关闭 RPC,如果碰巧你知道在某个时候没有新请求与任何以前的请求相关。顺便说一句:这些不再是 POJO,因为它们是协议缓冲区 ;-) @DiarmuidLeonard 是否存在给定请求可能与仅从同一线程发送的另一个请求相关的情况?如果是这样,那么如果您的每个线程都有自己的 RPC,那可能会更容易和更优化。

以上是关于gRPC Java 客户端能否通过长期存在的 gRPC 流并行发送多个请求以及如何管理 N 个流的主要内容,如果未能解决你的问题,请参考以下文章

grpc 示例是不是旨在跨语言交流?

gRPC

GRPC Ver 1.22.0 DNS 解析失败

Java 开发 gRPC 服务和客户端

GRPC 服务问题

grpc-go源码剖析六十一之假设在一条调用链上,存在多个grpc服务的调用,如A服务调用B服务调用C服务,那么他们的超时时间如何?