即使在确认后,应用程序也接受来自 google Pub/Sub 的重复消息

Posted

技术标签:

【中文标题】即使在确认后,应用程序也接受来自 google Pub/Sub 的重复消息【英文标题】:Application accepts duplicated messages from google Pub/Sub even after acknowledge 【发布时间】:2020-04-02 12:20:03 【问题描述】:

当我在 GCP 环境中测试应用程序时,我遇到了非常奇怪的浮动 bug()。我找不到具体的重现步骤,但它确实不时发生。

我看到该消息已成功确认:

2019-12-06 12:37:47.348  INFO 1 --- [sub-subscriber3] .i.g.MyAcknowledgementHandler : Acknowledged message - 1575635858865987

我有以下代码要确认:

        var generation = message.getHeaders().get("objectGeneration");
        pubSubMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class)
        pubSubMessage.ack().addCallback(
                v -> 
                    removeFromIdempotentStore(targetMessage, false);
                    log.info("Acknowledged message - ", generation);
                ,
                e -> 
                    removeFromIdempotentStore(targetMessage, false);
                    log.error("Failed to acknowledge message - ", generation, e);
                
        );

我还看到以下日志:

2019-12-06 12:37:48.868 WARN 1 --- [sub-subscriber1] c.b.m.i.MyDiscardedMessagesHandler : Duplicate message received GenericMessage [... headers=gcp_pubsub_acknowledgement=org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$1@1abafe68, bxwid=12345, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@3c3efd63, idempotent.keys=[objectId.mixed emails.csv, objectGeneration.1575635858865987].....

而且它会无限重复。此外,我在订阅图中看到消息仍然存在(在确认回调调用之后)

丢弃逻辑:

....
.gateway(nexrFlow, idempotentByHeader("objectId")); 


Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) 
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader))
            .errorChannel(errorChannel())
            .replyTimeout(0L);


default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) 
    MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
    var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
    interceptor.setDiscardChannel(idempotentDiscardChannel());
    return interceptor;

我不知道如何解决它。有什么想法吗?

【问题讨论】:

你检查了ObjectID 的值吗?我猜是在这个字段上进行了去重 @guillaume blaquiere 是的,我做到了。此标头包含文件名 您确定您的消息的唯一性吗?不是同时发送2个吗? @guillaume blaquiere 我 100% 确定这是相同的消息。他们总是有相同的 message_id @g*** 为了正确重现这一点,您能否向我们提供您的 Pub/Sub 客户端版本和AckDeadline? 【参考方案1】:

Pub/sub 旨在保证至少传递一条消息,因此这是预期行为。官方解释请查看product FAQs。

如上所述,频繁重复的原因之一是消息未在acknowledgement deadline 中确认。如果该消息的处理时间超过截止日期,则重新发送该消息。这就是我在之前的评论中要求 AckDeadline 的原因。默认情况下应该是 10 秒。您可以检查您如何在控制台中配置它,单击您正在使用的订阅。您可以尝试增加它以等待更多消息完成处理。通过在订阅中单击一次编辑来执行此操作。

但是,即使您在截止日期内确认,有时也会发生重复。为了保证至少一次交付,这是必要的。

【讨论】:

我检查过了 - 这不是一个选项 请在此处找到更多详细信息:***.com/questions/59248156/… 我关于成功确认后重复的问题 @g*** 您是否在测试前确认您的主题/订阅者已完全清除/清空?您可能有多个响应在等待。 100% 确定一切都是空的

以上是关于即使在确认后,应用程序也接受来自 google Pub/Sub 的重复消息的主要内容,如果未能解决你的问题,请参考以下文章

即使在更新 SHA 1 指纹以获取发布密钥后,Google 地图也无法在 Play 商店中发布后加载

Google PubSub Request 消息即使确认?

Unity:即使在播放器设置中未选中 x86 后,此版本也不符合 Google Play 64 位要求错误

如何实现集群中的每个节点都接受来自 Google 发布/订阅主题的消息?

JSP即使在更新后也显示来自数据库的相同数据

即使似乎授予访问权限,Google Calendar API在OAuth期间也会出现403错误