从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名

Posted

技术标签:

【中文标题】从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名【英文标题】:Read Files from multiple folders in Apache Beam and map outputs to filenames 【发布时间】:2018-12-28 05:48:14 【问题描述】:

处理从多个文件夹中读取文件,然后使用 python sdk 和数据流运行器将具有文件名的文件内容(文件内容,文件名)输出到 apache Beam 中的 bigquery。

最初以为我可以为每个文件创建一个 pcollection,然后将文件内容与文件名映射。

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()

这工作正常,但速度很慢,并且在大约 10000 个文件后无法在数据流云上工作。如果超过 10000 个左右的文件,它将遭受管道损坏。

目前正在尝试从 Text.io 重载 ReadAllFromText 函数。 Text.io 旨在从文件名或模式的集合中快速读取大量文件。如果从 Google 云存储读取并且文件具有内容编码,则此模块中存在错误。谷歌云存储会自动压缩文件并对其进行转码,但由于某种原因,ReadAllFromText 无法使用它。您必须更改文件的元数据以删除内容编码并将 ReadAllFromText 上的压缩类型设置为 gzip。我将包含此问题 url,以防其他人对 ReadAllFromText 有问题 https://issues.apache.org/jira/browse/BEAM-1874

我当前的代码如下所示

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = self._read_all_files
        return (
            pvalue          
            | 'ReadAllFiles' >> files #self._read_all_files
            | 'Map values' >>  beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
            )

ReadAllFromText 包含在 Text.io 中,从 filebasedsource.py 调用 ReadAllText 并继承自 PTransform。

我相信我只是缺少一些简单的东西。

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py

【问题讨论】:

【参考方案1】:

如您所见,ReadFromText 目前不支持动态文件名,您绝对不想为每个 URL 创建单独的步骤。从您的第一句话中,我了解到您希望将文件名和文件内容作为一个项目。这意味着您不需要或受益于文件的任何部分流。您可以简单地读取文件内容。比如:

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems


def read_all_from_url(url):
    with FileSystems.open(url) as f:
        return f.read()


def read_from_urls(pipeline, urls):
    return (
        pipeline
        | beam.Create(urls)
        | 'Read File' >> beam.Map(lambda url: (
            url,
            read_all_from_url(url)
        ))
    )

如果您认为元数据存在问题,您可以对其进行自定义。输出将是一个元组(url文件内容)。如果您的文件内容非常大,您可能需要根据您的用例稍微不同的方法。

【讨论】:

我不记得我现在做了什么,但我相信我最初尝试过这样的事情,但它导致了内存错误或类似的事情。我会尝试这种方式,看看效果如何......我最终将文件名和位置放入 bigquery 并以这种方式进行处理 使用 beam.Create,url 仍然需要放入内存,整个列表被序列化,发送到云端并反序列化。如果它确实是一个很长的列表,那么将它保存在云中是有意义的,例如谷歌存储中的文本文件(但也可以是 BigQuery) - 任何真正发出 url 的东西。如果您想并行运行它,您还需要防止融合,但这是另一个问题。

以上是关于从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名的主要内容,如果未能解决你的问题,请参考以下文章

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

Apache Beam 处理文件

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

使用 Apache Beam 从数据库中读取批量数据

Apache Beam 使用多个表时的写入次数

Apache Beam TextIO.ReadAll(),处理丢失的文件名?