使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

Posted

技术标签:

【中文标题】使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?【英文标题】:Google Cloud Storage Concurrency Control with Python / Apache Beam? 【发布时间】:2018-08-30 22:48:02 【问题描述】:

我正在使用 Python 中的 Apache Beam 构建管道,并且在写入 Google Cloud Storage 中的文件时需要避免竞争条件。

以下链接介绍了如何使用 gsutil 在 Google Cloud Storage 中使用并发控制。

https://cloud.google.com/storage/docs/gsutil/addlhelp/ObjectVersioningandConcurrencyControl#concurrency-control

有谁知道是否有办法使用 Python 或 Apache Beam Python SDK 来完成同样的事情?

【问题讨论】:

【参考方案1】:

如果您需要按顺序执行某些操作,最好的办法是按键进行分组,将它们组合在一起。

例如,如果您有两个不同的元素写入同一个 GCS 文件,您可能需要执行以下操作:

(my_collection | beam.Map(lambda x: (x['filename'], x))
               | beam.GroupByKey()
               | beam.Map(write_each_value))

通过执行GroupByKey,您可以确保具有相同文件名的元素进入同一个worker,并按顺序操作。

【讨论】:

FWIW 如果您分享有关您的用例的更多信息,我或许可以为您提供更量身定制的答案。 这里有更多背景知识:我有一个 IoT 设备,它大约每 30 分钟向 GCS 发送数据包。该数据以二进制形式发送。我的目标是去 GCS,将二进制文件解析为 CSV,按 device_id 和日期分组,然后最终得到一个 CSV,其中包含给定用户在给定日期的所有数据。由于我一直在测试管道,我一直在使用批处理,但我最终将使用流式处理。在流式传输的情况下,如何保证两个工作人员不会同时尝试写入同一个文件? 我明白你的意思。有一个线程来监视新文件的目录并发出事件是否合理?像这样:beam.Create(['gs://my_directory']) | beam.FlatMap(monitor_directory_continuously) | beam.Reshuffle() | MyOperationPerFile()。 - 该管道将在单个线程中监视目录,并将文件名重新洗牌给其他工作人员进行分析

以上是关于使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息

在 python Apache Beam 中打开一个 gzip 文件

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

使用 Python 处理 Apache Beam 管道中的异常

使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

如何使用 Apache Beam Python 将输出写入动态路径