我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读

Posted

技术标签:

【中文标题】我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读【英文标题】:How could I read fast enough from Google Pub/Sub using Python 【发布时间】:2020-05-04 14:55:13 【问题描述】:

我正在尝试从实时公共 projects/pubsub-public-data/topics/taxirides-realtime 流中读取消息,但我处理数据的速度似乎不够快,或者存在确认问题。 “未确认的消息计数”不断增加我正在做的任何事情(即使我在运行代码之前清除了消息)。我尝试在我的家用 Windows 10 PC、基于 GCP 的 Ubuntu VM 和 GCP 控制台终端上运行相同的代码,结果相同。

附加信息:在我的一个 GCP 项目中,我为公共项目/pubsub-public-data/topics/taxirides-realtime PubSub 主题和我的应用程序创建了一个订阅“taxi-ride-client”。消息正在到达我的程序,但处理缓慢或不正确。

是我做错了什么,还是 Python 太慢了?这是我的代码:

import os
from google.cloud import pubsub_v1

def callback(message):
    ''' Processing PubSub messages '''
    message.ack()

if __name__ == '__main__':

    project_name = '<projectname>'
    credfile = '<credfilename>.json'
    subscription_name = 'taxi-ride-client'

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile

    subscriber = pubsub_v1.SubscriberClient()
    subscription = subscriber.subscription_path(project_name, subscription_name)
    subscr_future = subscriber.subscribe(subscription, callback=callback)
    print('Listening for messages via: '.format(subscription))

    try:
        subscr_future.result(timeout=600)   # running for 10 minutes
    except Exception as ex:
        subscr_future.cancel()

    print('\nNormal program termination.\n')

流每小时产生大约 8 到 1000 万条记录,其中不到 0.5% 与我的回调中的 IF 条件匹配。无论如何,我还尝试了一个只包含确认行的完全空的回调。

我还以 5 个单独的副本运行这个小程序,以便从同一个订阅中读取,但即使在这种情况下,我也无法有所作为。这表明我对确认有疑问。

我做错了什么?

顺便说一下,我使用 GC DataFlow 实现了解决方案,第一步是从 PubSub 主题中读取,并且在 Python 下运行良好。那是一个不同的库和不同的架构。但它每小时可以轻松处理 9 000 000 条消息。

我仍然很好奇,这应该如何使用 python 和纯 PubSub(没有 Beam)来完成。

(更新)

复制

    使用名称创建的 GCP 项目:&lt;your-test-project&gt; 使用项目/所有者角色和以 JSON 格式下载的凭据文件创建服务帐户文件 在命令行中创建订阅:gcloud pubsub subscriptions create projects/&lt;your-test-project&gt;/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h 带有 google-cloud-pubsub 的 Python 3.7 虚拟环境(1.1.0 版) 替换&lt;projectname&gt;&lt;credfilename&gt;后运行代码。源代码here

加博

【问题讨论】:

您是否进行过任何基准测试/分析?有什么方法可以让我们自己运行程序(参见:minimal reproducible example)? 我在原帖末尾添加了复制步骤。 您可以在订阅下添加多个消费者,这应该允许以更高的吞吐量并行处理数据。您仍然应该尝试验证您是否能够像使用一个消费者一样使用消息。 【参考方案1】:

由于 Python 运行时在多线程处理方面固有的限制,很难在 Cloud Pub/Sub 中实现高吞吐量。 Dataflow 在其从 Pub/Sub 读取的实现中不使用 Python,因此它不受此类限制。对于具有多核的单台机器,Java 和 Go 往往具有更好的性能特征,因此一种选择是切换语言。或者,您必须水平扩展并启动更多客户端实例,以便您可以并行处理更多数据。您可能会发现blog post on client library performance 很有趣。

【讨论】:

【参考方案2】:

比赛后期,但是:

    您是否考虑过延长截止日期?您的客户端代码显示未来 10 分钟会超时,但 PubSub 仍会在 1 分钟后取消它。为后者尝试 600 秒。 可以选择多个消费者,但是您需要实现同步拉取与带回调的异步。

考虑到您在处理拉取时的延迟,这可能是一个更好的选择。您可以批量消费正在流式发布的内容(Pub-Sub 的目的)。

在实现多线程之前 - 或者如果消息处理受 CPU 限制,则在实现多处理之前 - 从单个 sub pull 开始并首先使用消息计数,然后在需要时添加线程/进程。

【讨论】:

以上是关于我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读的主要内容,如果未能解决你的问题,请参考以下文章

使用Python for循环,我怎样才能每次都从一个字符串中取出一个字母而没有任何重复

Python:Celery,我怎样才能让它在后台运行?

我怎样才能 TextView 文本从一个地方飞到特定地方并反转?

怎么样才能上谷歌

一台电脑上做了代理 怎样才能做到局域网其他电脑都可以通过这台电脑 访问google

中国如何访问谷歌?怎样才能访问谷歌