如何使用 Python 从 Google Pub/Sub 可靠地提取消息?

Posted

技术标签:

【中文标题】如何使用 Python 从 Google Pub/Sub 可靠地提取消息?【英文标题】:How to pull messages from Google Pub/Sub reliably using Python? 【发布时间】:2021-11-14 04:20:48 【问题描述】:

因此,即使该主题中有大量未决消息,pull 方法有时也会返回 0 条消息。我做错了吗?

import os
from google.cloud import pubsub
import ast

PROJECT_ID = os.environ['PROJECT_ID']
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, 'subscription-name')

while True:
  response = subscriber.pull(
    request=
      "subscription": subscription_path,
      "max_messages": 50,
    
  )

  if not response.received_messages:
    print('❌ no messages in pub/sub')
    break
  
  for msg in response.received_messages:
    
    message_data = ast.literal_eval(msg.message.data.decode('utf-8'))
    # transform data and publish to another topic

  ack_ids = [msg.ack_id for msg in response.received_messages]
  subscriber.acknowledge(
    request=
      "subscription": subscription_path,
      "ack_ids": ack_ids,
    
  )

print('???? No more messages left in the queue. Shutting down...')

【问题讨论】:

【参考方案1】:

这并不意外,pub/sub 是一个复杂的分布式系统,它不保证交付时间、排序、复制……

请注意文档 (https://cloud.google.com/pubsub/docs/pull) 中的相关段落,以便更好地处理消息。

请注意,通过同步实现低消息传递延迟 拉力,重要的是同时拥有许多出色的拉力 要求。随着主题吞吐量的增加,更多的拉取请求 是必要的。一般来说,异步拉取更适合于 对延迟敏感的应用程序。

【讨论】:

【参考方案2】:

返回零消息并不能很好地指示是否有消息可用。该服务尝试快速向用户返回消息,如果没有立即可用的消息,它可能会返回比请求消息更少甚至为零消息的响应。一次发送单个拉取请求将使消息不太可能被轻松缓存并可以快速发送给客户端。

接收消息的最佳方式是使用asynchronous pull via the client libraries。客户端库使用流式拉取,这意味着与服务器存在持久连接,并且可以在消息可用时立即传递。您可以使用异步订阅者并跟踪自收到最后一条消息以来的时间量,如果已经过了足够的时间,请关闭订阅者。但是,由于 Cloud Pub/Sub 中没有端到端延迟保证,因此仍有可能无法在这样的时间范围内传递消息。

或者,如果您必须使用同步拉取,那么您将需要遵循synchronous pull documentation 中的指南:“请注意,要通过同步拉取实现低消息传递延迟,同时有许多未完成的拉取请求非常重要。随着主题吞吐量的增加,需要更多的拉取请求。”这样,总是有未完成的请求到服务器准备接收消息。

理想情况下,让订阅者持续运行,以便在消息发布时可以快速接收。如果您想在没有要处理的消息时将订阅者扩展到零资源,请考虑使用Google Cloud Functions with Pub/Sub。

【讨论】:

在我看来,pub/sub 并不是我用例的最佳工具。我正在尝试每月排队约 10.000.000 条文本用于 NLP 处理。处理一条记录大约需要 0.5 秒。我想我可以将 pub/sub 用作队列,并拥有 3-instances-large Compute Instances Group。输入数据(文本片段)不断来自其他数据源。 如果输入持续不断,那么 Pub/Sub 是您正在做的事情的正确工具。只是您需要以一种不断尝试获取消息的方式进行订阅。如果您打算在订阅者收到零个消息时停止它,那么您打算如何在有消息要处理时重新启动它?如果数据不连续并且需要以批处理方式进行更多处理,那么写入 GCS 并将计算实例作为 cron 作业启动可能更有意义。

以上是关于如何使用 Python 从 Google Pub/Sub 可靠地提取消息?的主要内容,如果未能解决你的问题,请参考以下文章

我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读

如何从前端移动客户端发布到 Google Pub/Sub?

如何从 Google Pub/Sub 获取 objectId、bucketId 等

Google Cloud Pub/Sub Python SDK 一次检索单个消息

Google Cloud 上使用 Pub/Sub 的主/从模式

从浏览器发布 Google Cloud Pub/Sub - 身份验证如何工作?