如何从 GCP 存储桶中读取 Apache Beam 中的多个文件

Posted

技术标签:

【中文标题】如何从 GCP 存储桶中读取 Apache Beam 中的多个文件【英文标题】:How to read multiple files in Apache Beam from GCP bucket 【发布时间】:2020-03-04 23:09:23 【问题描述】:

我正在尝试使用 Apache Beam 在 GCP 中的多个文件上读取和应用一些子集。我准备了两个仅适用于一个文件的管道,但是当我在多个文件上尝试它们时失败了。除此之外,如果可能的话,我会很方便地将我的管道组合成一个,或者有办法编排它们以便它们按顺序工作。现在管道在本地工作,但我的最终目标是使用 Dataflow 运行它们。

我 textio.ReadFromText 和 textio.ReadAllFromText,但在多个文件的情况下我无法使两者都工作。

def toJson(file):
    with open(file) as f:
        return json.load(f)


 with beam.Pipeline(options=PipelineOptions()) as p:
       files = (p
        | beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
        | beam.io.WriteToText("/home/test",
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

 with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p  
            | 'read_data' >> beam.Create(['test-00000-of-00001.json'])
            | "toJson" >> beam.Map(toJson)
            | "takeItems" >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
            | beam.combiners.Count.PerElement()
            | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

这两个管道对于单个文件效果很好,但我有数百个相同格式的文件,想利用并行计算的优势。

有没有办法让这个管道适用于同一目录下的多个文件?

是否可以在单个管道中执行此操作而不是创建两个不同的管道? (将文件从桶中写入工作节点并不方便。)

【问题讨论】:

文件名中是否有一些元数据您试图保留到您正在写入的文件名? textio 支持 glob 模式,可以直接处理压缩类型。 @RezaRokni,感谢您的评论。你能给这个用例举个例子吗?我不明白。里面没有元数据。 我可能不了解您的用例,但您可以将 glob 模式与您的 text.io.ReadFromText("gs://my_bucket/*.txt') 一起使用。然后使用您的 beam.Map (toJson)。 beam 抱怨它需要读取 num_bytes 并且当我在读取中提供 num_bytes 时它说 JsonDecode 错误。 啊抱歉,我浏览了您的示例并错过了您尝试读取整个文件,而不是从中读取行。您是否已经尝试过使用 fileio 而不是 textio? textio 从由换行符分隔的文件中读取行。 fileio 生成代表文件及其元数据的记录集合 【参考方案1】:

我解决了如何使其适用于多个文件,但无法使其在单个管道中运行。我使用了 for 循环,然后使用了 beam.Flatten 选项。

这是我的解决方案:

file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_-00000-of-00001.json".format(i) for i in range(len(file_list))]

with beam.Pipeline(options=PipelineOptions()) as p:
    for i,file in enumerate(file_list):
       (p 
        | "Read Text ".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
        | "Write TExt ".format(i) >> beam.io.WriteToText("/home/subject_test_".format(i),
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
   for i,res in enumerate(res_list):
         pcol = (p   | 'read_data_'.format(i) >> beam.Create([res])
            | "toJson_".format(i) >> beam.Map(toJson)
            | "takeItems_".format(i) >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects_".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
        pcols.append(pcol)
   out = (pcols
    | beam.Flatten()
    | beam.combiners.Count.PerElement()
    | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

【讨论】:

使用ReadAllFromText 并将PCollection 作为input 传递不是更容易吗? @GuillemXercavins,感谢您的评论,我无法使用 toJson 函数。我仍在尝试在一个管道而不是两个管道中完成所有这些工作。为此,我在“toJson”函数中尝试了 apache_beam.io.gcp.gcsfilesystem,但 read() 失败说我必须给出“num_bytes”,当我将 json 放入其中时不会发生这种情况。 @JonsiBillups 在这种情况下你如何触发你的管道。我正在做类似的事情,但是当我执行 pipeline.run() 时,由于某种原因,它进入了无限循环。 @AmrutaDeshmukh,我通过 GCP 上的 Dataflow 运行器触发运行,见下文:link

以上是关于如何从 GCP 存储桶中读取 Apache Beam 中的多个文件的主要内容,如果未能解决你的问题,请参考以下文章

如何在Python中更有效地审计GCP存储桶中的数千个对象

GCP apache气流,如何从私有存储库安装Python依赖项

如何从 S3 存储桶中仅读取最近 7 天的 csv 文件

如何从 S3 存储桶中读取最后修改的 csv 文件?

如何从 Spark 中的多个云存储桶中读取 TXT 文件?

如何通过 http 触发器从 AWS SNS 触发 GCP 云功能(私有)