如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息

Posted

技术标签:

【中文标题】如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息【英文标题】:How to get messages from googles Pub/Sub sytsem by using the current pubsub subsciber 【发布时间】:2019-06-09 09:09:58 【问题描述】:

我需要使用基于 python 的订阅者从 googles Pub/Sub 系统接收已发布的消息。

为此,我做了以下步骤:

在 Web 控制台上,我创建了一个项目、一个注册表、一个遥测主题、一个设备,并将订阅主题附加到遥测主题 我的代码可以通过 mqtt 桥发布消息以及 pubsub 库的发布功能 我可以使用以下 cmd 在终端上提取此消息:
gcloud pubsub subscriptions pull --auto-ack projects/project_id/subscriptions/subscription_topic

在下面你会看到我的代码中重要的 sn-p。它基于 git-examples,但在 google-cloud-pubsub 包的 0.39.1 版本中似乎不再存在某些功能。一个例子是subscriber.subscription_path() 方法。

def receive_messages(subscription_path, service_account_json):
    import time
    from google.cloud import pubsub_v1
    subscriber = pubsub_v1.SubscriberClient(credentials=service_account_json)

    #subscription_path = subscriber.subscription_path(
    #   project_id, subscription_name)

    def callback(message):
        print('Received message: '.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on '.format(subscription_path))
    while True:
        time.sleep(60)

当我运行这个函数时,无数线程在后台一点一点启动,但似乎没有一个线程退出或启动回调函数。

我希望安装了所有要求:
pip3 freeze

asn1crypto==0.24.0
cachetools==3.0.0
certifi==2018.11.29
cffi==1.11.5
chardet==3.0.4
cryptography==2.4.2
google-api-core==1.7.0
google-api-python-client==1.7.5
google-auth==1.6.2
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.2.0
google-cloud-bigquery==1.8.1
google-cloud-core==0.29.1
google-cloud-datastore==1.7.3
google-cloud-monitoring==0.31.1
google-cloud-pubsub==0.39.1
google-resumable-media==0.3.2
googleapis-common-protos==1.5.6
grpc-google-iam-v1==0.11.4
grpcio==1.17.1
httplib2==0.12.0
idna==2.8
keyring==10.1
keyrings.alt==1.3
oauthlib==3.0.0
paho-mqtt==1.4.0
protobuf==3.6.1
pyasn1==0.4.5
pyasn1-modules==0.2.3
pycparser==2.19
pycrypto==2.6.1
pycurl==7.43.0
pygobject==3.22.0
PyJWT==1.6.4
python-apt==1.4.0b3
pytz==2018.9
pyxdg==0.25
redis==3.0.1
requests==2.21.0
requests-oauthlib==1.2.0
RPi.GPIO==0.6.5
rsa==4.0
SecretStorage==2.3.1
six==1.12.0
unattended-upgrades==0.1
uritemplate==3.0.0
urllib3==1.24.1
virtualenv==16.2.0
我在 debian 上以及 Windows 10 上运行该代码并更新了 gcloud:
gcloud components update

在过去的一周里,我一直在尝试不同的解决方案,或者开始使用看似过时的谷歌示例。此外,似乎比代码示例更早的文档也无济于事。所以我希望这里有人可以帮助我最终通过 Pub/Sub-Sytsem 接收基于 python 的客户端消息。

我希望我能提供最重要的信息,并提前感谢您为帮助我所做的努力。

【问题讨论】:

也许这对你的github.com/googleapis/google-cloud-python有帮助 您好 Tamir,感谢您的帮助,但不幸的是,我已经知道这个存储库并且已经尝试保留其中描述的示例,但不幸的是它导致了完全相同的行为。我开始复制以前创建的主题,然后在后台启动了无限数量的线程,没有收到任何消息,也没有调用消息的回调函数。但是,如果我使用 gcloud 工具,我可以订阅并接收同一主题下的消息。最好的问候 【参考方案1】:

python 文档站点here 上维护的示例应该是最新的。在运行任何代码之前,请确保您已遵循“为了使用此库,您首先需要完成以下步骤”部分中的所有步骤。特别是,您可能没有正确设置身份验证,我认为您不应该手动传递凭据路径。

【讨论】:

您好丹尼尔,非常感谢您的帮助。当然,遵循“为了使用此库,您首先需要执行以下步骤”中列出的步骤。我还可以使用下面的“Cloud Tools for PowerShell”发布和订阅。 # Publish: gcloud pubsub topics publish projects / project_id / topics / topic_id --message = "Test Message" # Pull subsirbe: gcloud pubsub subscriptions pull --auto-ack projects / project_id / subscriptions / subscription_id 我已经创建了一个订阅客户端和一个公共客户端,如以下文档中所述:googleapis.github.io/google-cloud-python/latest/pubsub # 发布者客户端将无限期挂在调用“future = publisher. publish" (topic_name, b'My first message !, "spam = 'eggs')" 没有发布过消息。 # 订阅者客户端要么在函数“subscription_respond =subscriber.create_subscription() 中无限期保留name = subscription_name, topic = topic_name)”,要么在没有这个函数的情况下启动future =subscriber.subscribe (subscription_name, callback) 在后台无限多任务而无需调用回调。 快速问题:你说上面是你代码的“重要部分”。在您代码的其他部分的任何时候,您是否正在使用多处理创建子流程?分叉是known 导致grpc 挂起。我的另一个建议是在创建订阅后确保您的待办事项中有未完成的消息,以便订阅者可以阅读。【参考方案2】:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received message.")
message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, 
callback=callback)
print(f"Listening for messages on subscription_path..\n")


try:

    streaming_pull_future.result(timeout=timeout)
except TimeoutError:
    streaming_pull_future.cancel()  # Trigger the shutdown.
    streaming_pull_future.result()  # Block until the shutdown is 

【讨论】:

您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center。

以上是关于如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息的主要内容,如果未能解决你的问题,请参考以下文章

Google Pubsub - 接收推送订阅的传递尝试

如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?

如何通过 terraform 使用服务帐户创建谷歌云 pubsub 订阅?

从 Google PubSub 中提取消息不起作用 - 权限被拒绝

Google PubSub 订阅无法从 StatusCode.UNAVAILABLE [code=8a75] 错误中恢复

将频道与 google pubsub poll 订阅者一起使用