grpc-java:正确处理客户端重试以进行服务流调用

Posted

技术标签:

【中文标题】grpc-java:正确处理客户端重试以进行服务流调用【英文标题】:grpc-java: Proper handling of retry on client for service streaming call 【发布时间】:2020-03-19 14:09:35 【问题描述】:

我正在尝试使用服务流和客户端上的异步存根在 grpc 上设置一个简单的发布/订阅模式。在将部分流式消息实现回客户端后,我想处理连接断开的场景。现在,我正在实施部分服务,例如关闭服务并且客户端应该从连接丢失中“恢复”。

我已经阅读并搜索了 google/github/so 上的重试机制,最后为流式传输消息的服务中的方法设置了重试策略。据我了解,当服务返回重试策略中定义的一些 retryableStatusCodes 时,重试机制应该起作用。在客户端引入重试策略后,我想对其进行测试,以下两个场景的结果让我对重试感到困惑。

第一个场景:

connect 过程被调用(大约 n 秒后故意没有消息流回客户端) 服务已关闭 onError 在客户端调用 服务再次启动 连接通过重试再次到达

第二种情况:

connect 过程被调用(大约 n 秒后第一条消息到达,消息在客户端的 onNext 处理程序中处理) 服务已关闭 onError 在客户端调用 服务再次启动 连接通过重试再次到达

总的来说,让我感到困惑的是,为什么这两种情况之间的行为会有所不同?为什么在第一种情况下检测到服务器返回 UNAVAILABLE 并尝试重试,但在第二种情况下即使状态相同,重试也不起作用?

这里是客户端connect调用、服务connect方法以及客户端重试策略设置的代码

client:

messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() 
    @Override
    public void onNext(MessageResponse messageResponse) 
        //process new message
        MessageDto message = new MessageDto();
        message.setBody(messageResponse.getBody());
        message.setTitle(messageResponse.getTitle());

        messageService.broadcastMessage(message);
    

    @Override
    public void onError(Throwable throwable) 
        //service went down
        LOGGER.error(throwable.getStackTrace());
    

    @Override
    public void onCompleted() 
        //This method should be called when user logs out of the application
        LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    
);
service:

@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) 
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
        (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated



@EventListener
public void listenForMessages(MessageEvent messageEvent) 
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())


retryPolicy:


  "methodConfig" : [
    
      "name": [
        
          "service": "ch.example.proto.MessageService",
          "method": "connect"
        
      ],
      "retryPolicy": 
        "maxAttempts": 10,
        "initialBackoff": "5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": ["UNAVAILABLE"]
      
    
  ]

【问题讨论】:

【参考方案1】:

问题在于接收消息提交 RPC。这在gRFC A6 Client Retries 中进行了讨论。它提到了Response-Headers,这是在服务器响应第一条消息时隐式发送的。

本质上,一旦 gRPC 将数据传回客户端,就无法自动重试。如果 gRPC 重试,它应该如何将新流与它已经响应的流结合起来?它应该跳过第一个N 响应吗?但是,如果现在的反应不同怎么办?元数据(通过Response-Headers 传递)的问题更严重,因为这些元数据无法再次提供给客户端。

gRPC 能够将客户端的请求重播到多个后端,但是一旦它开始接收来自后端的响应,它将变得“固定”到该后端并且无法更改其决定。

您将需要应用程序级重试来重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。

【讨论】:

以上是关于grpc-java:正确处理客户端重试以进行服务流调用的主要内容,如果未能解决你的问题,请参考以下文章

csharp Azure DocumentDB .Net SDK执行Async方法的示例,重试以处理RequestRateTooLargeException或HTTP 429错误

java.net.HttpRetryException:由于服务器身份验证而无法重试,在流模式下

Angular Http 请求出错后重试

grpc-java 服务器立即关闭

grpc-java的ServerBuilder中executor的具体用途是啥?它只是执行处理程序方法吗?

远程服务器错误 请稍后重试 是啥原因?