i.grpc.internal.AbstractClientStream - 接收到的关于关闭流的数据意义

Posted

技术标签:

【中文标题】i.grpc.internal.AbstractClientStream - 接收到的关于关闭流的数据意义【英文标题】:i.grpc.internal.AbstractClientStream - Received data on closed stream meaning 【发布时间】:2021-09-25 08:59:30 【问题描述】:

我有一个 Spring 启动应用程序 (v2.2.10.RELEASE),它订阅了 pubSub 中的多个主题并提取异步数据并将其发送到其他地方。我没有使用 SpringGCP,只是使用原生谷歌库

这是我的订阅者设置:

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
            (PubsubMessage message, AckReplyConsumer consumer) -> 
                messages.add(message);
                consumer.ack();
            ;

    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
                                    .setParallelPullCount(2)
                                    .setFlowControlSettings(flowControlSettings)
                                    .setCredentialsProvider(credentialsProvider)
                                    .setExecutorProvider(executorProvider)
                                     //.setChannelProvider()
                                    .build();

由于流量大且消息量大(2 - 4 kb),我遇到了以下信息消息:

[grpc-default-worker-ELG-1-1] INFO  i.grpc.internal.AbstractClientStream - Received data on closed stream

首先,我不完全明白这意味着什么?我注意到的是,当这种情况发生时,传递的重复消息会增加。所以我认为这意味着 pubSub 试图通过一些消息联系订阅者,但由于某种原因订阅者还没有准备好,所以 pubSub 将尝试再次传递消息。因此会有更多重复,对吗?

在订阅者中使用TransportChannelProvider 可以解决这个问题吗?我对写得不好的文档的理解是,当当前使用的通道关闭时,这将创建一个新的传递通道,因此摆脱了以前的日志消息。

如果是,我如何定义频道目标字符串?我在哪里可以找到受管理频道的符合 NameResolver 的 URI。我的意思是sn-p是这样的:

    private TransportChannelProvider getChannelProvider() 
    ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext(true).build();
    return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

我对 GCP 很陌生,如果我的问题不够连贯,请见谅

【问题讨论】:

【参考方案1】:

使用自定义 TransportChannelProvider 无法解决此类问题。这更有可能是堆栈中更深层次的问题,例如,在 gRPC 级别。此类错误 [1、2] 存在一些未解决的问题。

关于导致重复的原因,消息可能是通过已经关闭的流传递的(这与错误消息一致),因为它们被困在 gRPC 层的较低级别的缓冲区中因此最终成为随后通过另一个流传递和处理的消息的副本。这可能是围绕large backlogs of small messages 的文档中讨论的问题的一个版本。在 Java 客户端库的 v1.109.0 中已修复此问题,因此如果您使用的是旧版本,则值得更新。

如果重复仍然是一个问题,最好reach out to support 提供您的订阅名称和一些重复消息的消息 ID,以便他们可以查看这些消息的传递模式并进一步诊断如果这些重新交付是意外的。

【讨论】:

我使用的是 v1.110.3 所以这应该有修复吧?我也已经在使用 flowControl 并将 outStanding messageCount 限制为 200。但是对于(1-2 百万)发送消息,我仍然会收到数千个重复消息 数百万条消息中的几千条重复并不真正超出预期范围。

以上是关于i.grpc.internal.AbstractClientStream - 接收到的关于关闭流的数据意义的主要内容,如果未能解决你的问题,请参考以下文章