Google Cloud Pub/Sub Python SDK 一次检索单个消息

Posted

技术标签:

【中文标题】Google Cloud Pub/Sub Python SDK 一次检索单个消息【英文标题】:Google Cloud Pub/Sub Python SDK retrieve single message at a time 【发布时间】:2018-10-27 22:32:51 【问题描述】:

问题:我的用例是我想从 Google Cloud Pub/Sub 接收消息 - 使用 Python Api 一次一条消息。当前的所有示例都提到使用 Async/callback 选项从 Pub/Sub 订阅中提取消息。这种方法的问题是我需要保持线程活着。

是否可以只接收 1 条消息并关闭连接,即是否有一项功能可以让我将参数(类似于 max_messages)设置为 1,以便在收到 1 条消息后线程终止?

文档here 没有列出任何关于 Python 同步拉取的内容,它似乎对 Java 等其他语言有 num_of_messages 选项。

【问题讨论】:

【参考方案1】:

根据官方文档here:

...您可以实现对 Pub/Sub 消息流的一次性处理, 因为 PubsubIO 基于自定义消息标识符对消息进行重复数据删除 或 Pub/Sub 分配的标识符。

因此,您应该能够使用记录 ID(即消息的标识符)来允许跨 Dataflow 和其他系统之间的边界进行一次性处理。要使用记录 ID,您可以在构造 PubsubIO.Read 或 PubsubIO.Write 转换时调用 idLabel,并传递您选择的字符串值。在java中这将是:

public PubsubIO.Read.Bound<T> idLabel(String idLabel)

这会返回一个与此类似的转换,但它会从给定的消息属性中读取唯一的消息 ID。

【讨论】:

我没有在 Dataflow 中使用它,也没有使用 Java【参考方案2】:

参见下面的例子in this link:

from google.cloud import pubsub_v1

client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
max_messages = 1

response = client.pull(subscription, max_messages)
print(response)

我已经尝试过自己并使用它一次收到一条消息。

如果您遇到错误,请尝试将 pubsub 库更新到最新版本:

pip install --upgrade google-cloud-pubsub

In docs here你有更多关于代码sn-p中使用的拉取方法的信息:

Pull 方法依赖于请求/响应模型:

应用程序发送消息请求。服务器回复 零个或多个消息并关闭连接。

【讨论】:

以上是关于Google Cloud Pub/Sub Python SDK 一次检索单个消息的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Pub/Sub - Cloud Function & Bigquery - 数据插入未发生

Google Cloud 上使用 Pub/Sub 的主/从模式

如何修改后台 Cloud Function 的 Google Cloud Pub/Sub 订阅确认截止日期

Google Cloud Pub/Sub 确认

Google Cloud Pub/Sub 获取或创建订阅

Google Cloud Functions 无法使用 Pub/Sub 死信主题