在Databricks(DBFS)中递归列出目录和子目录的文件

Posted

技术标签:

【中文标题】在Databricks(DBFS)中递归列出目录和子目录的文件【英文标题】:list the files of a directory and subdirectory recursively in Databricks(DBFS) 【发布时间】:2021-01-05 09:31:45 【问题描述】:

使用python/dbutils,如何在Databricks文件系统(DBFS)中递归显示当前目录和子目录的文件。

【问题讨论】:

walk/dbfs/mnt/my/mount/... 类似的路径使用(而不是dbfs:/mnt/my/mount/... - 请注意前缀) 【参考方案1】:

你也可以试试这个递归函数:

def lsR(path):
  return([fname for flist in [([fi.path] if fi.isFile() else lsR(fi.path)) for fi in dbutils.fs.ls(path)] for fname in flist])
                
lsR('/your/folder')

【讨论】:

如果您可以在代码中添加一些解释,答案会更有用。 我很乐意这样做,但我不确定您希望得到什么样的解释? lsR() 应该返回一个文件名列表,因此: 1. part1: [([fi.path] if fi.isFile() else lsR(fi.path)) for fi in dbutils.fs.ls(path)] 构建列表列表。对于 dbutils.fs.ls 的每个结果,如果 fi 是一个文件,它只放置一个列表,否则如果 fi 是一个目录,它会递归调用 lsR() 以获取文件名列表 2。然后 part1 通过 double “解包”理解 [fname for flist in for fname in flist] 这将 [['a'], ['b'], ['c', 'd', 'e']] 更改为 ['a', ' b', 'c', 'd', 'e']【参考方案2】:

这里列出了其他答案,但值得注意的是,databricks 将数据集存储为文件夹。

例如,您可能有一个名为 my_dataset_here 的“目录”,其中包含如下文件:

my_dataset_here/part-00193-111-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-123-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-222-c845-4ce6-8714-123-c000.snappy.parquet
my_dataset_here/part-00193-444-c845-4ce6-8714-123-c000.snappy.parquet
...

在一组典型的表格中会有这样的文件数以千计

尝试枚举此类文件夹中的每个文件可能需要很长时间...比如几分钟,因为对 dbutils.fs.ls 的单个调用必须返回每个结果的数组。

因此,一种幼稚的方法,例如:

stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
  current_folder = stack.pop(0)
  for file in dbutils.fs.ls(current_folder):
    if file.isDir():
      stack.append(file.path)
      print(file.path)
    else:
      print(file.path)

确实会列出每个文件,但也需要很长时间才能完成。在我的测试环境中,枚举 50 多个表需要 8 分钟

但是,如果使用新的“delta”格式,则会在 delta 表文件夹中创建一个名为“_delta_log”的标准命名文件夹。

因此,我们可以修改我们的代码来检查每个文件夹,看看它是否是一个数据集,然后再尝试枚举文件夹的全部内容:

stack = ["/databricks-datasets/COVID/CORD-19/2020-03-13"]
while len(stack) > 0:
  current_folder = stack.pop(0)
  for file in dbutils.fs.ls(current_folder):
    if file.isDir():
      # Check if this is a delta table and do not recurse if so!
      try:
        delta_check_path = f"file.path/_delta_log"
        dbutils.fs.ls(delta_check_path)  # raises an exception if missing
        print(f"dataset: file.path")
      except:            
        stack.append(file.path)
        print(f"folder: file.path")
    else:
        print(f"file: file.path")

此代码在 38 秒内在相同的测试环境中运行。

在琐碎的情况下,幼稚的解决方案是可以接受的,但在现实世界的情况下它很快就会变得完全不可接受。

请注意,此代码仅适用于增量表;如果您使用 parquet/csv/任何格式,那么您就不走运了。

【讨论】:

【参考方案3】:

另一种实现可以使用生成器和yield 运算符来完成。对于yield from 运算符,您必须至少使用Python 3.3+ 并查看此great post 以更好地理解yield 运算符:

def get_dir_content(ls_path):
    for dir_path in dbutils.fs.ls(ls_path):
        if dir_path.isFile():
            yield dir_path.path
        elif dir_path.isDir() and ls_path != dir_path.path:
            yield from get_dir_content(dir_path.path)
    
list(get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13'))

【讨论】:

【参考方案4】:

关于 dbutils.fs.ls(和 %fs 魔法命令)令人惊讶的是它似乎不支持任何递归开关。但是,由于 ls 函数返回 FileInfo 对象的列表,因此递归迭代它们以获取整个内容是非常简单的,例如:

def get_dir_content(ls_path):
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
    

paths = get_dir_content('/databricks-datasets/COVID/CORD-19/2020-03-13')
[print(p) for p in paths]

【讨论】:

你也可以使用%ls -R <path>

以上是关于在Databricks(DBFS)中递归列出目录和子目录的文件的主要内容,如果未能解决你的问题,请参考以下文章

带有 python 的 Azure Databricks dbfs

Azure Databricks - 导出和导入 DBFS 文件系统

无法在 databricks 运行时版本 7 中使用 shell 命令访问 /dbfs/FileStore

如何使用 dbfs 之外的 Python 文件创建 Databricks 作业?

Databricks:将dbfs:/ FileStore文件下载到我的本地计算机?

将数据表从 Databricks dbfs 导出到 azure sql 数据库