为啥我会收到 50% 的 GCP Pub/Sub 消息重复?

Posted

技术标签:

【中文标题】为啥我会收到 50% 的 GCP Pub/Sub 消息重复?【英文标题】:Why am I getting 50% of GCP Pub/Sub messages duplicated?为什么我会收到 50% 的 GCP Pub/Sub 消息重复? 【发布时间】:2021-11-24 01:13:19 【问题描述】:

我正在运行分析管道。

吞吐量约为每秒 11 条消息。 我的 Pub/Sub 主题安排了大约 200 万条消息。 80 个 GCE 实例正在并行拉取消息。

这是我的主题和订阅:

gcloud pubsub topics create pipeline-input

gcloud beta pubsub subscriptions create pipeline-input-sub \
    --topic pipeline-input \
    --ack-deadline 600 \
    --expiration-period never \
    --dead-letter-topic dead-letter

这是我拉消息的方式:

import  PubSub, Message  from '@google-cloud/pubsub'

const pubSubClient = new PubSub()

const queue: Message[] = []

const populateQueue = async () => 
  const subscription = pubSubClient.subscription('pipeline-input-sub', 
    flowControl: 
      maxMessages: 5
    
  )
  const messageHandler = async (message: Message) => 
    queue.push(message)
  
  subscription.on('message', messageHandler)


const processQueueMessage = () => 
  const message = queue.shift()
  try 
    ...
    message.ack()
   catch 
    ...
    message.nack()
  
  processQueueMessage()


processQueueMessage()

处理时间约为 7 秒。

这是许多类似的重复案例之一。 相同的消息被传递 5 (!!!) 次到不同的 GCE 实例:

03:37:42.377 03:45:20.883 03:48:14.262 04:01:33.848 05:57:45.141

所有 5 次消息都被成功处理并.ack()ed。输出包含的消息比输入多 50%!我很清楚"at least once" behavior,但我认为它可能会重复 0.01% 的消息,而不是 50%。

主题输入 100% 没有重复。我通过云监控验证了主题输入法和未确认消息的数量。数字匹配:发布/订阅主题中没有重复项。

更新:

    看起来所有这些重复都是由于确认截止日期到期而创建的。我 100% 确信我在 600 秒截止日期之前确认了 99.9% 的消息。

【问题讨论】:

【参考方案1】:

预计会有一些重复,但 50% 的重复率肯定很高。第一个问题是,这些是发布端重复还是订阅端重复?前者是在重试发布同一消息时创建的,导致同一消息的多次发布。这些消息将具有不同的消息 ID。后者是由向订阅者重新传递相同的消息引起的。这些消息具有相同的消息 ID(尽管有不同的确认 ID)。

听起来您已验证这些是订阅端重复项。因此,正如您提到的,可能的原因是过期的确认截止日期。问题是,为什么消息超过了确认期限?需要注意的一点是,在使用客户端库时,订阅中设置的确认截止日期并不是使用的那个。相反,客户端库尝试根据客户端库设置和第 99 个百分位 ack 延迟来优化 ack 截止日期。然后它更新消息的租约,直到 max_lease_duration property of the FlowControl object 传递到 subscribe 方法。默认为一小时。

因此,为了使消息保持租用状态,客户端库必须能够向服务器发送modifyAckDeadline 请求。重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器过载。运行此管道的机器是否在做其他工作?如果是这样,则它们可能在 CPU、内存或网络方面过载,无法发送 modifyAckDeadline 请求并且无法及时处理消息。

消息批处理也可能会影响您确认消息的能力。作为一种优化,Pub/Sub 系统存储批量消息而不是单个消息的确认。因此,必须确认批次中的所有消息才能确认所有消息。因此,如果您在一个批次中有 5 条消息并确认其中的 4 条,但没有确认最后一条消息,则所有 5 条消息都将被重新发送。有一些缓存可以尽量减少这种情况,但这仍然是一种可能性。有一个Medium post 对此进行了更详细的讨论(请参阅“消息重新传递和重复率”部分)。一旦收到消息并在调用acknack 之前打印出消息ID,可能值得检查代码中的所有消息是否已确认和未确认。如果您的消息是分批发布的,则单个 nack 可能会导致重新发送更多消息。

批处理和重复之间的这种耦合是我们正在积极努力改进的。我希望这个问题会在某个时候停止。同时,如果您对发布者有控制权,可以将批处理设置中的max_messages 属性设置为1,以防止消息批处理。

如果这些都没有帮助,最好打开一个支持案例并提供一些重复消息的项目名称、订阅名称和消息 ID。工程师可以更详细地调查个别消息被重新传递的原因。

【讨论】:

根据处理结果,我正在处理大量消息。这个想法是将它们发送回队列并稍后尝试再次处理它们。根据你所说的,这显然是我有所有这些重复的原因。我正在处理一条消息,而一堆其他消息因此被删除。我想知道为什么这是 Pub/Sub 设计的一部分? 这实际上是我们正在积极努力改进的地方。我希望这个问题会在某个时候停止。同时,如果您可以控制发布者,则可以将 max_messages 属性设置为 1 以防止消息批处理。我也会用这个更新我的答案。 我的意思是在发布端设置 maxMessages,而不是在订阅端。抱歉,我的链接发错地方了。 This is the maxMessages you want to set。用更好的链接更新了我的答案。 是的,想通了。但它会影响发布者的吞吐量吗? 看起来会......而且它也会影响订阅者的吞吐量......这是一个相当大的权衡

以上是关于为啥我会收到 50% 的 GCP Pub/Sub 消息重复?的主要内容,如果未能解决你的问题,请参考以下文章

验证数据已发送到 GCP Pub/Sub

确认后 GCP 消息保留在 Pub/Sub 中

gcp 云函数 pub/sub 主题死信

python Python:订阅GCP Pub / Sub

python Python:订阅GCP Pub / Sub

如何将 blob 文件发布到 GCP Pub/Sub?