已确认的消息正在重试

Posted

技术标签:

【中文标题】已确认的消息正在重试【英文标题】:Acknowledged messages getting retried 【发布时间】:2021-07-26 13:01:24 【问题描述】:

我正在使用 Google Pub/Sub ruby​​ 客户端来处理发送到多个主题的消息。对于收到的每条消息,我使用 ActiveJob 将其排入队列并确认它以将其标记为已处理。

subscription.listen do |msg|
  Rails.logger.debug("Processing message with id #msg.message_id")

  MyJob.perform_later(JSON.parse(msg.data))
  msg.acknowledge!
  
  Rails.logger.debug("ACKed message with id #msg.message_id")
end

但是,通过检查日志,我发现已经确认的消息正在一次又一次地处理(请参阅日志摘录):

2021-05-04 02:15:17.089 EDT "Processing message with id 2260372604401883"
2021-05-04 02:15:17.180 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:17:58.121 EDT "Processing message with id 2260372604401883"
2021-05-04 02:17:58.186 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:20:59.899 EDT "Processing message with id 2260372604401883"
2021-05-04 02:20:59.985 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:22:21.083 EDT "Processing message with id 2260372604401883"
2021-05-04 02:22:21.394 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:24:18.389 EDT "Processing message with id 2260372604401883"
2021-05-04 02:24:18.485 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:25:54.274 EDT "Processing message with id 2260372604401883"
2021-05-04 02:25:54.385 EDT "ACKed message with id 2260372604401883"
2021-05-04 02:26:59.087 EDT "Processing message with id 2260372604401883"
2021-05-04 02:26:59.184 EDT "ACKed message with id 2260372604401883" 

Google 报告的未确认消息数量急剧增加,因此我怀疑 acknowledge! 方法的行为与预期不符。

Ruby 版本:2.6.6 google-cloud-pubsub 版本:2.6.1(最新)

【问题讨论】:

您能否提供有关您的架构/方案的更多信息?有关流程概述的一些详细信息?为什么要使用多个主题?发送到每个主题的消息是否不同?每个订阅者都应该像this docs中提到的那样确认收到的消息如果您有多个主题具有相同的消息,每个主题的订阅都应该确认消息,也许这就是原因? 感谢您的评论。我们使用多个主题按业务含义分隔事件。例如,当发生付款时,我们在payment-complete 主题中发布一条消息,当创建帐户时,我们在account-created 主题中发布一条消息。一条消息只属于一个主题,它永远不会跨主题共享。关于架构,这个项目订阅了大约 40 个主题。提供的 sn-p 包含在循环中,因此我们为每个订阅配置回调(入队和 ACK)。 【参考方案1】:

我以前从未见过这个问题,但我可以建议一些尝试:

    记录订阅者确认截止日期以确保它有足够的时间让MyJob.perform_later 返回。这实际上只是一个健全性检查,因为它很可能是默认的 60 秒,但您不妨检查一下。 在订阅者上配置错误处理程序:
subscriber = subscription.listen do |msg|
  # process message
  msg.acknowledge!
end

Rails.logger.debug("subscriber ack deadline: #subscriber.deadline")

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  Rails.logger.error(error.message)
end
    配置 gRPC 调试日志记录。通用记录器配置说明位于LOGGING.md。要使它们适应 Rails,您可以在 config/initializers/grpc_logging.rb 中添加以下内容:
module MyLogger
  def logger
    Rails.logger
  end
end

# Define a gRPC module-level logger method before grpc/logconfig.rb loads.
module GRPC
  extend MyLogger
end

【讨论】:

以上是关于已确认的消息正在重试的主要内容,如果未能解决你的问题,请参考以下文章

GCP 中的确认截止日期、消息保留时间、死信和重试策略

在重试同一条消息之前,GCM CCS 等待 XMPP 服务器的确认多长时间?

rabbit mq 手动重试机制

ActiveMQ重试机制

PubSub:如何设置没有指数退避的重试策略?

动态链接 - 域和路径前缀正在发布。请在一个月后重试