GCP PubSub:Python 中的同步拉取订阅者?
Posted
技术标签:
【中文标题】GCP PubSub:Python 中的同步拉取订阅者?【英文标题】:GCP PubSub: Synchronous Pull Subscriber in Python? 【发布时间】:2018-06-20 04:04:31 【问题描述】:全部,
我正在尝试学习如何使用 GCP PubSub,并且我可以通过 CLI 命令(创建主题、订阅、发布到主题、从订阅中提取等)对其进行测试,但是当我跳转时转到 python(v 2.7,当前公司标准)我正在努力以同步方式提取消息。
我已经查看了这个 url,它告诉你去睡觉和 While True,但我无法想象在现实世界中有人会这样做,对吧? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-python
这个 url 告诉你可以使用 future.result(),我试过了,但它不会像你想的那样阻塞会话/线程: http://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/index.html
有人有其他想法吗?这是我的函数,它几乎直接来自示例之一:
def sample_receive_messages(subscription_name="my-sub", project=None):
"""Receives messages from a pull subscription."""
if not project:
credentials, project = google.auth.default()
subscriber = psub.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
# print('Received message: '.format(message))
message.ack()
print('<message>' + str(message.data) + '</message>')
subscription = subscriber.subscribe(subscription_path)
future = subscription.open(callback)
myResult = future.result()
subscription.close()
print("done")
最后我的目标是有一个进程经常唤醒,抓取消息并确认它们,将消息写入文件,然后结束。
到目前为止,该进程读取消息并将它们打印出来(很棒),但它会坐下来再坐下来,最后会出现一些乱码:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pubSubTools.py", line 50, in sample_receive_messages
myResult = future.result()
File "/usr/local/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 98, in result
raise err
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
【问题讨论】:
其他客户端(Ruby、php 和命令行界面)似乎不必被告知不要等待,这正是我希望 python 选项所做的。 【参考方案1】:这在library documentation中有所描述:
from google.cloud import SubscriberClient
pubsub_client = SubscriberClient()
subscription_path = 'projects/project/subscriptions/subscription'.format(project=project_name, subscription=subscription_name)
pull_response= pubsub_client.pull(subscription=subscription_path, max_messages=10)
for msg in pull_response.received_messages:
message = msg.message.data.decode('utf-8')
# do your thing
pubsub_client.acknowledge(subscription_path, [msg.ack_id])
【讨论】:
【参考方案2】:Python 库不明确支持 Cloud Pub/Sub API 中的同步拉取。 future.result() 是推荐的阻塞方式,但它仍然是异步拉取的。
我建议您尝试使用官方的Python Queue Class,其中回调调用 Queue.put(),然后使用 Queue.get() 消费消息。
【讨论】:
感谢您的回复 Katayoon,我试过了,但仍然无法找到真正的“同步”解决方案,所以我想我只是跳出 shell 并调用 CLI并以 JSON 格式获取标准输出并进行处理。我只是用 python 客户端像 CLI 一样工作,它真的很简单。 gcloud pubsub 订阅 pull --auto-ack my-sub --format=json 建议发布您的解决方案作为答案,以更好地帮助社区。span>以上是关于GCP PubSub:Python 中的同步拉取订阅者?的主要内容,如果未能解决你的问题,请参考以下文章