我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读
Posted
技术标签:
【中文标题】我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读【英文标题】:How could I read fast enough from Google Pub/Sub using Python 【发布时间】:2020-05-04 14:55:13 【问题描述】:我正在尝试从实时公共 projects/pubsub-public-data/topics/taxirides-realtime 流中读取消息,但我处理数据的速度似乎不够快,或者存在确认问题。 “未确认的消息计数”不断增加我正在做的任何事情(即使我在运行代码之前清除了消息)。我尝试在我的家用 Windows 10 PC、基于 GCP 的 Ubuntu VM 和 GCP 控制台终端上运行相同的代码,结果相同。
附加信息:在我的一个 GCP 项目中,我为公共项目/pubsub-public-data/topics/taxirides-realtime PubSub 主题和我的应用程序创建了一个订阅“taxi-ride-client”。消息正在到达我的程序,但处理缓慢或不正确。
是我做错了什么,还是 Python 太慢了?这是我的代码:
import os
from google.cloud import pubsub_v1
def callback(message):
''' Processing PubSub messages '''
message.ack()
if __name__ == '__main__':
project_name = '<projectname>'
credfile = '<credfilename>.json'
subscription_name = 'taxi-ride-client'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.subscription_path(project_name, subscription_name)
subscr_future = subscriber.subscribe(subscription, callback=callback)
print('Listening for messages via: '.format(subscription))
try:
subscr_future.result(timeout=600) # running for 10 minutes
except Exception as ex:
subscr_future.cancel()
print('\nNormal program termination.\n')
流每小时产生大约 8 到 1000 万条记录,其中不到 0.5% 与我的回调中的 IF 条件匹配。无论如何,我还尝试了一个只包含确认行的完全空的回调。
我还以 5 个单独的副本运行这个小程序,以便从同一个订阅中读取,但即使在这种情况下,我也无法有所作为。这表明我对确认有疑问。
我做错了什么?
顺便说一下,我使用 GC DataFlow 实现了解决方案,第一步是从 PubSub 主题中读取,并且在 Python 下运行良好。那是一个不同的库和不同的架构。但它每小时可以轻松处理 9 000 000 条消息。
我仍然很好奇,这应该如何使用 python 和纯 PubSub(没有 Beam)来完成。
(更新)
复制
-
使用名称创建的 GCP 项目:
<your-test-project>
使用项目/所有者角色和以 JSON 格式下载的凭据文件创建服务帐户文件
在命令行中创建订阅:gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
带有 google-cloud-pubsub 的 Python 3.7 虚拟环境(1.1.0 版)
替换<projectname>
和<credfilename>
后运行代码。源代码here
加博
【问题讨论】:
您是否进行过任何基准测试/分析?有什么方法可以让我们自己运行程序(参见:minimal reproducible example)? 我在原帖末尾添加了复制步骤。 您可以在订阅下添加多个消费者,这应该允许以更高的吞吐量并行处理数据。您仍然应该尝试验证您是否能够像使用一个消费者一样使用消息。 【参考方案1】:由于 Python 运行时在多线程处理方面固有的限制,很难在 Cloud Pub/Sub 中实现高吞吐量。 Dataflow 在其从 Pub/Sub 读取的实现中不使用 Python,因此它不受此类限制。对于具有多核的单台机器,Java 和 Go 往往具有更好的性能特征,因此一种选择是切换语言。或者,您必须水平扩展并启动更多客户端实例,以便您可以并行处理更多数据。您可能会发现blog post on client library performance 很有趣。
【讨论】:
【参考方案2】:比赛后期,但是:
-
您是否考虑过延长截止日期?您的客户端代码显示未来 10 分钟会超时,但 PubSub 仍会在 1 分钟后取消它。为后者尝试 600 秒。
可以选择多个消费者,但是您需要实现同步拉取与带回调的异步。
考虑到您在处理拉取时的延迟,这可能是一个更好的选择。您可以批量消费正在流式发布的内容(Pub-Sub 的目的)。
在实现多线程之前 - 或者如果消息处理受 CPU 限制,则在实现多处理之前 - 从单个 sub pull 开始并首先使用消息计数,然后在需要时添加线程/进程。
【讨论】:
以上是关于我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读的主要内容,如果未能解决你的问题,请参考以下文章
使用Python for循环,我怎样才能每次都从一个字符串中取出一个字母而没有任何重复
我怎样才能 TextView 文本从一个地方飞到特定地方并反转?