使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?
Posted
技术标签:
【中文标题】使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?【英文标题】:Google Cloud Storage Concurrency Control with Python / Apache Beam? 【发布时间】:2018-08-30 22:48:02 【问题描述】:我正在使用 Python 中的 Apache Beam 构建管道,并且在写入 Google Cloud Storage 中的文件时需要避免竞争条件。
以下链接介绍了如何使用 gsutil
在 Google Cloud Storage 中使用并发控制。
https://cloud.google.com/storage/docs/gsutil/addlhelp/ObjectVersioningandConcurrencyControl#concurrency-control
有谁知道是否有办法使用 Python 或 Apache Beam Python SDK 来完成同样的事情?
【问题讨论】:
【参考方案1】:如果您需要按顺序执行某些操作,最好的办法是按键进行分组,将它们组合在一起。
例如,如果您有两个不同的元素写入同一个 GCS 文件,您可能需要执行以下操作:
(my_collection | beam.Map(lambda x: (x['filename'], x))
| beam.GroupByKey()
| beam.Map(write_each_value))
通过执行GroupByKey
,您可以确保具有相同文件名的元素进入同一个worker,并按顺序操作。
【讨论】:
FWIW 如果您分享有关您的用例的更多信息,我或许可以为您提供更量身定制的答案。 这里有更多背景知识:我有一个 IoT 设备,它大约每 30 分钟向 GCS 发送数据包。该数据以二进制形式发送。我的目标是去 GCS,将二进制文件解析为 CSV,按 device_id 和日期分组,然后最终得到一个 CSV,其中包含给定用户在给定日期的所有数据。由于我一直在测试管道,我一直在使用批处理,但我最终将使用流式处理。在流式传输的情况下,如何保证两个工作人员不会同时尝试写入同一个文件? 我明白你的意思。有一个线程来监视新文件的目录并发出事件是否合理?像这样:beam.Create(['gs://my_directory']) | beam.FlatMap(monitor_directory_continuously) | beam.Reshuffle() | MyOperationPerFile()
。 - 该管道将在单个线程中监视目录,并将文件名重新洗牌给其他工作人员进行分析以上是关于使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?的主要内容,如果未能解决你的问题,请参考以下文章
无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息
在 python Apache Beam 中打开一个 gzip 文件
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表
使用 Python 处理 Apache Beam 管道中的异常