Google PubSub 如何在流式拉取消息时处理搜索到的消息

Posted

技术标签:

【中文标题】Google PubSub 如何在流式拉取消息时处理搜索到的消息【英文标题】:Google PubSub how to process seeked messages whilst streaming-pull messages 【发布时间】:2021-06-19 10:13:29 【问题描述】:

我正在尝试在 Python 中处理 seek(timestamp) 返回的消息。

我使用流式拉取订阅了非搜索消息:

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
future = subscriber.subscribe(subscription_path, callback=callback())

with subscriber:
    try:
        future.result()
    except TimeoutError:
        future.cancel()

def callback(msg):
    msg.ack()

我找到了SeekRequest 对象:

https://googleapis.dev/python/pubsub/2.4.0/types.html

我期待seek() 再次通过流式拉取接收重播消息。但是,有一个 SeekResponse 对象。

是否可以通过流式拉取回调将搜索到的消息转移到到达?

【问题讨论】:

【参考方案1】:

搜索响应不包含任何消息。事实上,这是一个空洞的回应。 RPC 有响应,因此您可以知道搜索是否有任何错误,例如在不存在的订阅上执行。 Seek 导致重播的消息以与常规传递相同的路径作为消息重新传递给订阅者,例如,再次通过流式拉取接收它们。

【讨论】:

请问,Python 中的 seek() 时间戳参数是什么格式?字符串,UTC 整数? API 只是说“时间戳”,我找不到任何示例。不确定我实际上需要传递什么。

以上是关于Google PubSub 如何在流式拉取消息时处理搜索到的消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输

pubsub 流式处理 pull nack 与无确认行为

Pubsub 拉取订阅和并发

Google PubSub 每次拉取的最大消息数

Google pubsub_v1 订阅者拉“打开的文件太多”

使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出