GCP PubSub Spring Boot 重复提取消息

Posted

技术标签:

【中文标题】GCP PubSub Spring Boot 重复提取消息【英文标题】:GCP PubSub Spring Boot repeat extract message 【发布时间】:2019-10-25 08:47:35 【问题描述】:

我需要有关 gcp pub/sus 问题的帮助。我有一个进程向 pubsub 发送 100 条带有过滤器的消息,另一个应用程序(在 spring boot 中)接收这些消息。当 Spring Boot 应用程序从 pubsub 接收消息(不是 pull)时,处理 100 条消息,但进入该进程,接收更多消息,在不同时间接收不同数量的消息,任何时间接收 120,另外 140,其他超过 200。我没有找到任何解决方案,这是我的代码:

    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public MessageHandler messageReceiver() 
        return message -> 
            System.out.println("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
            //other process of app (call other api)
            AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
            consumer.ack();
        ;
    

请帮帮我!!!

【问题讨论】:

一个好的开始方法是尝试增加确认截止日期和消息保留持续时间。供参考:cloud.google.com/pubsub/docs/faq#duplicatescloud.google.com/pubsub/docs/push 我试过了,但我有同样的问题,没有解决 【参考方案1】:

在 Google Cloud Pub/Sub 中,由于不同的原因可能会出现重复消息。需要记住的一件事是,Cloud Pub/Sub 提供至少一次交付,这意味着始终可能出现一定数量的重复,因此您的应用程序必须能够适应它们。不过,这么多重复似乎有点高。一般来说,重复通常可能由于以下原因而发生:

    发布者多次发送消息。如果发布者与 Cloud Pub/Sub 断开连接并再次发送相同的消息,则可能会发生这种情况。如果发生这种类型的重复,则消息将具有不同的消息 ID。 订阅者确认消息的时间过长。在您的代码中,您有//other process of app (call other api)。这个过程需要多长时间?如果超过确认消息的截止日期,则消息将被重新传递。请记住,如果此其他进程需要为所有消息获取锁,则可能会出现争用问题,即尝试同时获取这些锁的请求过多,从而导致处理延迟。默认情况下,消息的确认期限为十秒。使用 Java 客户端库时,截止日期由 maxAckExtensionPeriod 自动延长,默认为一小时。这个属性也可以在 Spring 的 DefaultSubscriberFactory 中设置。 消息根本不被确认。如果异常阻止对ack 的调用,或者出现死锁导致该行代码永远无法到达,则将重新传递消息。 用例是large backlog of small messages 之一。在这种情况下,缓冲区很容易以导致重新传递消息的方式填满客户端。

【讨论】:

嗨,Kamal,确认消息是通过 .ack() 方法,还是在接收该消息的方法结束时确认??,grettings

以上是关于GCP PubSub Spring Boot 重复提取消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot GCP:将 PubSub 应用程序部署到 App Engine 标准环境时出现“Google 凭据”异常

从现有的 GCP pubsub 订阅中消费

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

GCP Pubsub 中的消息丢失和重复

在 Spring Cloud GCP pubsub 中创建特定于消息通道的线程

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知