GCP PubSub:Python 中的同步拉取订阅者?

Posted

技术标签:

【中文标题】GCP PubSub:Python 中的同步拉取订阅者?【英文标题】:GCP PubSub: Synchronous Pull Subscriber in Python? 【发布时间】:2018-06-20 04:04:31 【问题描述】:

全部,

我正在尝试学习如何使用 GCP PubSub,并且我可以通过 CLI 命令(创建主题、订阅、发布到主题、从订阅中提取等)对其进行测试,但是当我跳转时转到 python(v 2.7,当前公司标准)我正在努力以同步方式提取消息。

我已经查看了这个 url,它告诉你去睡觉和 While True,但我无法想象在现实世界中有人会这样做,对吧? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-python

这个 url 告诉你可以使用 future.result(),我试过了,但它不会像你想的那样阻塞会话/线程: http://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/index.html

有人有其他想法吗?这是我的函数,它几乎直接来自示例之一:

def sample_receive_messages(subscription_name="my-sub", project=None):
"""Receives messages from a pull subscription."""
if not project:
    credentials, project = google.auth.default()  

subscriber = psub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    # print('Received message: '.format(message))
    message.ack()
    print('<message>' + str(message.data) + '</message>')

subscription = subscriber.subscribe(subscription_path)

future = subscription.open(callback)

myResult = future.result()

subscription.close()

print("done")

最后我的目标是有一个进程经常唤醒,抓取消息并确认它们,将消息写入文件,然后结束。

到目前为止,该进程读取消息并将它们打印出来(很棒),但它会坐下来再坐下来,最后会出现一些乱码:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pubSubTools.py", line 50, in sample_receive_messages
    myResult = future.result()
  File "/usr/local/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 98, in result
    raise err
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

【问题讨论】:

其他客户端(Ruby、php 和命令行界面)似乎不必被告知不要等待,这正是我希望 python 选项所做的。 【参考方案1】:

这在library documentation中有所描述:

from google.cloud import SubscriberClient

pubsub_client = SubscriberClient()
subscription_path = 'projects/project/subscriptions/subscription'.format(project=project_name, subscription=subscription_name)
pull_response= pubsub_client.pull(subscription=subscription_path, max_messages=10)

for msg in pull_response.received_messages:
    message = msg.message.data.decode('utf-8')
    # do your thing
    pubsub_client.acknowledge(subscription_path, [msg.ack_id])

【讨论】:

【参考方案2】:

Python 库不明确支持 Cloud Pub/Sub API 中的同步拉取。 future.result() 是推荐的阻塞方式,但它仍然是异步拉取的。

我建议您尝试使用官方的Python Queue Class,其中回调调用 Queue.put(),然后使用 Queue.get() 消费消息。

【讨论】:

感谢您的回复 Katayoo​​n,我试过了,但仍然无法找到真正的“同步”解决方案,所以我想我只是跳出 shell 并调用 CLI并以 JSON 格式获取标准输出并进行处理。我只是用 python 客户端像 CLI 一样工作,它真的很简单。 gcloud pubsub 订阅 pull --auto-ack my-sub --format=json 建议发布您的解决方案作为答案,以更好地帮助社区。​​span>

以上是关于GCP PubSub:Python 中的同步拉取订阅者?的主要内容,如果未能解决你的问题,请参考以下文章

GCP Nodejs8 云功能 - 同步 PubSub 发布

GCP Pubsub 主题持续时间中存在的消息数

GCP 数据流的推送与拉取

拉取请求上的 GCP Pub/Sub 过滤

GCP Pubsub 中的消息丢失和重复

GCP PubSub 主题推送问题