如何从 GCP 存储桶中读取 Apache Beam 中的多个文件
Posted
技术标签:
【中文标题】如何从 GCP 存储桶中读取 Apache Beam 中的多个文件【英文标题】:How to read multiple files in Apache Beam from GCP bucket 【发布时间】:2020-03-04 23:09:23 【问题描述】:我正在尝试使用 Apache Beam 在 GCP 中的多个文件上读取和应用一些子集。我准备了两个仅适用于一个文件的管道,但是当我在多个文件上尝试它们时失败了。除此之外,如果可能的话,我会很方便地将我的管道组合成一个,或者有办法编排它们以便它们按顺序工作。现在管道在本地工作,但我的最终目标是使用 Dataflow 运行它们。
我 textio.ReadFromText 和 textio.ReadAllFromText,但在多个文件的情况下我无法使两者都工作。
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
这两个管道对于单个文件效果很好,但我有数百个相同格式的文件,想利用并行计算的优势。
有没有办法让这个管道适用于同一目录下的多个文件?
是否可以在单个管道中执行此操作而不是创建两个不同的管道? (将文件从桶中写入工作节点并不方便。)
【问题讨论】:
文件名中是否有一些元数据您试图保留到您正在写入的文件名? textio 支持 glob 模式,可以直接处理压缩类型。 @RezaRokni,感谢您的评论。你能给这个用例举个例子吗?我不明白。里面没有元数据。 我可能不了解您的用例,但您可以将 glob 模式与您的 text.io.ReadFromText("gs://my_bucket/*.txt') 一起使用。然后使用您的 beam.Map (toJson)。 beam 抱怨它需要读取 num_bytes 并且当我在读取中提供 num_bytes 时它说 JsonDecode 错误。 啊抱歉,我浏览了您的示例并错过了您尝试读取整个文件,而不是从中读取行。您是否已经尝试过使用 fileio 而不是 textio? textio 从由换行符分隔的文件中读取行。 fileio 生成代表文件及其元数据的记录集合 【参考方案1】:我解决了如何使其适用于多个文件,但无法使其在单个管道中运行。我使用了 for 循环,然后使用了 beam.Flatten 选项。
这是我的解决方案:
file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_-00000-of-00001.json".format(i) for i in range(len(file_list))]
with beam.Pipeline(options=PipelineOptions()) as p:
for i,file in enumerate(file_list):
(p
| "Read Text ".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
| "Write TExt ".format(i) >> beam.io.WriteToText("/home/subject_test_".format(i),
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
for i,res in enumerate(res_list):
pcol = (p | 'read_data_'.format(i) >> beam.Create([res])
| "toJson_".format(i) >> beam.Map(toJson)
| "takeItems_".format(i) >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects_".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
pcols.append(pcol)
out = (pcols
| beam.Flatten()
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
【讨论】:
使用ReadAllFromText
并将PCollection 作为input 传递不是更容易吗?
@GuillemXercavins,感谢您的评论,我无法使用 toJson 函数。我仍在尝试在一个管道而不是两个管道中完成所有这些工作。为此,我在“toJson”函数中尝试了 apache_beam.io.gcp.gcsfilesystem,但 read() 失败说我必须给出“num_bytes”,当我将 json 放入其中时不会发生这种情况。
@JonsiBillups 在这种情况下你如何触发你的管道。我正在做类似的事情,但是当我执行 pipeline.run() 时,由于某种原因,它进入了无限循环。
@AmrutaDeshmukh,我通过 GCP 上的 Dataflow 运行器触发运行,见下文:link以上是关于如何从 GCP 存储桶中读取 Apache Beam 中的多个文件的主要内容,如果未能解决你的问题,请参考以下文章