GCP PubSub Spring Boot 重复提取消息
Posted
技术标签:
【中文标题】GCP PubSub Spring Boot 重复提取消息【英文标题】:GCP PubSub Spring Boot repeat extract message 【发布时间】:2019-10-25 08:47:35 【问题描述】:我需要有关 gcp pub/sus 问题的帮助。我有一个进程向 pubsub 发送 100 条带有过滤器的消息,另一个应用程序(在 spring boot 中)接收这些消息。当 Spring Boot 应用程序从 pubsub 接收消息(不是 pull)时,处理 100 条消息,但进入该进程,接收更多消息,在不同时间接收不同数量的消息,任何时间接收 120,另外 140,其他超过 200。我没有找到任何解决方案,这是我的代码:
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver()
return message ->
System.out.println("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
//other process of app (call other api)
AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
consumer.ack();
;
请帮帮我!!!
【问题讨论】:
一个好的开始方法是尝试增加确认截止日期和消息保留持续时间。供参考:cloud.google.com/pubsub/docs/faq#duplicatescloud.google.com/pubsub/docs/push 我试过了,但我有同样的问题,没有解决 【参考方案1】:在 Google Cloud Pub/Sub 中,由于不同的原因可能会出现重复消息。需要记住的一件事是,Cloud Pub/Sub 提供至少一次交付,这意味着始终可能出现一定数量的重复,因此您的应用程序必须能够适应它们。不过,这么多重复似乎有点高。一般来说,重复通常可能由于以下原因而发生:
-
发布者多次发送消息。如果发布者与 Cloud Pub/Sub 断开连接并再次发送相同的消息,则可能会发生这种情况。如果发生这种类型的重复,则消息将具有不同的消息 ID。
订阅者确认消息的时间过长。在您的代码中,您有
//other process of app (call other api)
。这个过程需要多长时间?如果超过确认消息的截止日期,则消息将被重新传递。请记住,如果此其他进程需要为所有消息获取锁,则可能会出现争用问题,即尝试同时获取这些锁的请求过多,从而导致处理延迟。默认情况下,消息的确认期限为十秒。使用 Java 客户端库时,截止日期由 maxAckExtensionPeriod 自动延长,默认为一小时。这个属性也可以在 Spring 的 DefaultSubscriberFactory 中设置。
消息根本不被确认。如果异常阻止对ack
的调用,或者出现死锁导致该行代码永远无法到达,则将重新传递消息。
用例是large backlog of small messages 之一。在这种情况下,缓冲区很容易以导致重新传递消息的方式填满客户端。
【讨论】:
嗨,Kamal,确认消息是通过 .ack() 方法,还是在接收该消息的方法结束时确认??,grettings以上是关于GCP PubSub Spring Boot 重复提取消息的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot GCP:将 PubSub 应用程序部署到 App Engine 标准环境时出现“Google 凭据”异常
使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置