使用数据流在 gcs 存储桶上按大小列出文件夹

Posted

技术标签:

【中文标题】使用数据流在 gcs 存储桶上按大小列出文件夹【英文标题】:List folders by size on gcs bucket with dataflow 【发布时间】:2021-05-12 15:04:19 【问题描述】:

查看this 问题上的代码,我希望能够创建一个数据流管道,该管道可以查看特定 gcs 存储桶文件夹中的所有文件,并根据以下方面说明具有最大数据量的最终子目录字节。我会编写类似于:

的代码
class SortFiles(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes > 0:
      # Sort the files here? 


class SortFolders(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes > 0:
      # Sort the folders here based on maximum addition of a combination 
      # of the file sizes and file numbers 


def delete_empty_files():

    options = PipelineOptions(...)

    gfs = gcs.GCSFileSystem(pipeline_options)
    p = beam.Pipeline(options=pipeline_options)

    discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).metadata_list)
                        | 'Reshuffle' >> beam.Reshuffle() 
                        | 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
                        | 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
                        | 'OutputFolders' >> ...

我还没有决定是按字节总数还是其中的文件总数列出文件夹。我将如何解决这个问题?另一个问题在于我希望能够找到最终的子目录,而不是这个任务的父文件夹。

【问题讨论】:

【参考方案1】:

GCSFileSystem 有一个函数du,它会告诉你特定路径下的大小。 https://gcsfs.readthedocs.io/en/latest/api.html?highlight=du#gcsfs.core.GCSFileSystem

在阅读您的问题时,我认为您想要

    首先找到存储桶中所有本身不包含目录的目录(如果我理解“最终子目录”) 然后对它们每个运行du, 然后按大小对结果列表进行排序

如果您尝试计算嵌套文件数:

    列出所有对象,名称将是 a/、a/b.txt、a/b/c.txt 等 编写一个函数来计算嵌套在每个子路径下的对象

【讨论】:

以上是关于使用数据流在 gcs 存储桶上按大小列出文件夹的主要内容,如果未能解决你的问题,请参考以下文章

在 Dataflow 的云存储桶上哪里可以找到这个 pubsub 订阅?

如何设置 GCS 存储桶的限制

如何使用 gsutil 命令列出 gcs 存储桶中的所有文件(包含所有文件夹)以匹配文件内容中的特定字符串

数据流作业 GCS 到 Pub/sub 最大批量大小

如何在 GCS 中的增量表之上创建 BQ 外部表并仅显示最新快照

自定义角色以允许下载文件并禁用 GCS 存储桶中的上传文件