将列表转换为 PCollection

Posted

技术标签:

【中文标题】将列表转换为 PCollection【英文标题】:Convert a list into a PCollection 【发布时间】:2021-05-15 21:29:20 【问题描述】:

我目前有一个DoFn,它查看一个存储桶并查看该存储桶和目录前缀中的所有文件。这个DoFn 返回一个列表而不是PCollection。如何将此列表转换为PCollection 可以被DoFn ConvertFileNames 使用?

  # List all the files within a subdir 
  class ListBlobs(beam.DoFn):
    def start_bundle(self):
      self.storage_client = storage.Client()

    def process(self, prefix):
      bucket = self.storage_client.bucket('xxx')
      return list(self.bucket.list_blobs(prefix=prefix))

  # Convert Blobs into filenames as patterns
  class ConvertFileNames(beam.DoFn):
    def process(self, blob):
      return 'gs://' + blob.bucket.name + blob.name

【问题讨论】:

【参考方案1】:

如beam documentation 中所述,Beam DoFn 的 process 方法返回一个可迭代的元素以放置到下游 PCollection 中。所以,在你的例子中,如果我有一个前缀的 PCollection,称之为prefix_pcoll,那么我可以写

blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())

blobs_pcoll 将包含带有此前缀的 blob 列表(即,list(self.bucket.list_blobs(prefix=prefix)) 在所有前缀上的串联)。然后你可以写

converted = blobs_pcoll | beam.ParDo(ConvertFileNames())

你也可以写

converted = blobs_pcoll | beam.Map(
    lambda blob: 'gs://' + blob.bucket.name + blob.name)

您可能还想查看apache_beam.io.fileio.MatchAll。

【讨论】:

以上是关于将列表转换为 PCollection的主要内容,如果未能解决你的问题,请参考以下文章

将字符串列表转换为整数列表[重复]

将浮点列表列表转换为一个列表[重复]

如何将列表转换为数据表[重复]

将每行从整数列表转换为整数数组?

如何将列表列表转换为数据框?(熊猫)

将列表列表转换为Python中的字典字典