如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?

Posted

技术标签:

【中文标题】如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?【英文标题】:How to iterate in Databricks to read hundreds of files stored in different subdirectories in a Data Lake? 【发布时间】:2020-06-17 10:25:04 【问题描述】:

我必须从 Azure Data Lake Gen2 读取 Databricks 中的数百个 avro 文件,从每个文件内的 Body 字段中提取数据,并将所有提取的数据连接到一个唯一的数据框中。关键是所有要读取的 avro 文件都存储在湖中的不同子目录中,遵循以下模式:

root/YYYY/MM/DD/HH/mm/ss.avro

这迫使我循环提取和选择数据。我正在使用这个 Python 代码,其中 list_avro_files 是所有文件的路径列表:

list_data = []

for file_avro in list_avro_files:
  df = spark.read.format('avro').load(file_avro)
  data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
  list_data.append(data1)

data = reduce(DataFrame.unionAll, list_data)

有什么方法可以更有效地做到这一点?如何并行化/加速这个过程?

【问题讨论】:

【参考方案1】:

只要您的list_avro_files 可以通过标准通配符语法表示,您就可以使用Spark 自己的并行读取操作能力。您只需为您的 avro 文件指定 basepath 和文件名模式:

scala> var df = spark.read
                 .option("basepath","/user/hive/warehouse/root")
                 .format("avro")
                 .load("/user/hive/warehouse/root/*/*/*/*.avro")

而且,如果您发现需要确切知道任何给定行来自哪个文件,请使用 input_file_name() 内置函数来丰富您的数据框:

scala> df = df.withColumn("source",input_file_name())

【讨论】:

以上是关于如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?的主要内容,如果未能解决你的问题,请参考以下文章

使用 azure databricks 读取 azure databricks 日志 json 文件

在 Python 中读取 SQL 文件以通过 databricks 对 JDBC 运行

如何使用 SAS 读取 Azure databricks 中的 blob

如何在 Databricks 中管理 S3 挂载的权限

如何在 DataBricks 中并行读取文件?

使用 /mnt/ 将数据从 Azure Blob 存储读取到 Azure Databricks