如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?
Posted
技术标签:
【中文标题】如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?【英文标题】:How to iterate in Databricks to read hundreds of files stored in different subdirectories in a Data Lake? 【发布时间】:2020-06-17 10:25:04 【问题描述】:我必须从 Azure Data Lake Gen2 读取 Databricks 中的数百个 avro 文件,从每个文件内的 Body 字段中提取数据,并将所有提取的数据连接到一个唯一的数据框中。关键是所有要读取的 avro 文件都存储在湖中的不同子目录中,遵循以下模式:
root/YYYY/MM/DD/HH/mm/ss.avro
这迫使我循环提取和选择数据。我正在使用这个 Python 代码,其中 list_avro_files 是所有文件的路径列表:
list_data = []
for file_avro in list_avro_files:
df = spark.read.format('avro').load(file_avro)
data1 = spark.read.json(df.select(df.Body.cast('string')).rdd.map(lambda x: x[0]))
list_data.append(data1)
data = reduce(DataFrame.unionAll, list_data)
有什么方法可以更有效地做到这一点?如何并行化/加速这个过程?
【问题讨论】:
【参考方案1】:只要您的list_avro_files
可以通过标准通配符语法表示,您就可以使用Spark 自己的并行读取操作能力。您只需为您的 avro 文件指定 basepath
和文件名模式:
scala> var df = spark.read
.option("basepath","/user/hive/warehouse/root")
.format("avro")
.load("/user/hive/warehouse/root/*/*/*/*.avro")
而且,如果您发现需要确切知道任何给定行来自哪个文件,请使用 input_file_name()
内置函数来丰富您的数据框:
scala> df = df.withColumn("source",input_file_name())
【讨论】:
以上是关于如何在 Databricks 中迭代以读取存储在数据湖不同子目录中的数百个文件?的主要内容,如果未能解决你的问题,请参考以下文章
使用 azure databricks 读取 azure databricks 日志 json 文件
在 Python 中读取 SQL 文件以通过 databricks 对 JDBC 运行