如何在 google Pub/Sub 中快速发布多条消息?
Posted
技术标签:
【中文标题】如何在 google Pub/Sub 中快速发布多条消息?【英文标题】:How to publish multiple messages in google Pub/Sub fast? 【发布时间】:2021-11-17 07:10:00 【问题描述】:如何快速发布多条消息到 pubsub?没有多处理和多线程,因为代码已经在一个线程中
下面的代码每秒发布 40 条消息
publisher = pubsub.PublisherClient(
credentials=credentials,
batch_settings=types.BatchSettings(
max_messages=1000, # default is 100
max_bytes=1 * 1000 * 1000, # 1 MiB
max_latency=0.1, # default is 10 ms
)
)
topic_name = 'projects/project_id/topics/topic'.format(
project_id=PROJECT_ID,
topic=TOPIC_PUBSUB,
)
for data in results:
bytes_json_data = str.encode(json.dumps(data))
future = publisher.publish(topic_name, bytes_json_data)
future.result()
【问题讨论】:
每秒 40 条消息并不是那么快。你的批处理配置应该足够了。你有什么问题? 【参考方案1】:取出:
future.result()
别这样:
for data in results:
bytes_json_data = str.encode(json.dumps(data))
future = publisher.publish(topic_name, bytes_json_data)
发布 10k 条消息应该花费不到一秒钟的时间
【讨论】:
【参考方案2】:不要一次发布一条消息,然后等待future
,您应该一次发布所有消息,然后等待最后发布的futures
。它看起来像:
from concurrent import futures
...
publish_futures = []
for data in results:
bytes_json_data = str.encode(json.dumps(data))
future = publisher.publish(topic_name, bytes_json_data)
publish_futures.append(future)
...
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
有一个带有示例代码的detailed example in the docs。
【讨论】:
以上是关于如何在 google Pub/Sub 中快速发布多条消息?的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Firebase Cloud Function 在 Google Pub/Sub 中发布消息?
Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?
将 BigQuery 表流式传输到 Google Pub/Sub
如何在 Google Cloud Function 上的 Spring Cloud 函数中获取 Pub/Sub 事件的元数据