在 Pyspark Databricks 中处理 1000 个 JSON 文件

Posted

技术标签:

【中文标题】在 Pyspark Databricks 中处理 1000 个 JSON 文件【英文标题】:working with 1000's of JSON files in Pyspark Databricks 【发布时间】:2020-01-02 19:38:26 【问题描述】:

我有大约 2.5 k JSON 文件,每个 JSON 文件代表 1 行。有了这些文件,我需要做一些非常简单的 ETL 并将它们移动到我的数据湖的 curated 部分。

我遍历我的数据湖并通过简单的.read 调用调用我的 JSON 文件,我事先定义了我的 JSON 架构。

然后我执行 ETL 并尝试将这些文件写入数据湖的单独部分,但是写入部分非常慢,写一个文件只需要 15 分钟几百kb?

rp  = spark.read.json(paths, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

for iter in iterable:
    #do stuff
    # filter my sparkDF with .filter
    SparkDF_F = sparkDF.filter(...)
    sparkDF_F.write('path/filename.parquet')

我尝试使用 'OPTIMIZE' 并在我的路径上调用它

%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'

这会引发以下错误。

Error in SQL statement: ParseException: 
mismatched input 'dbfs:/mnt/raw/data/table' expecting 'SELECT', 'FROM', '
ADD', 'AS', 'TIMESTAMP', 'VERSION', 'ALL', 'ANY', 'DISTINCT', 
'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER.... 

有人能指导我了解我在这里的误解吗?

设置

Azure 数据块 6.0 火花 2.4 Python 3.6 42GB 集群,12 核。 4 个节点 Azure Gen1 DataLake。

【问题讨论】:

【参考方案1】:

两件事:

    如果 2.5k JSON 文件存储在同一文件夹中。您可以使用相同的文件夹路径直接读取它们:

    rp = spark.read.json(path_common, multiLine=True,schema=json_s).withColumn('path',F.input_file_name())

然后,您可以在整个数据帧中应用 rp.filter,因为它只有一个(无需对每个文件进行迭代)

    关于Delta的文档,你只能优化一个表(存储在dbfs中),不能直接优化一个DBFS文件。因此,您可以使用 dbfs 中指向的目录创建表,并使用文档中建议的优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

希望对你有帮助

【讨论】:

确实如此,我明天将首先测试。不幸的是,json 不在同一个文件夹中,因为我们以一种可以在未来 n 年内访问的方式存储它们。 您确实需要单个文件夹中的文件(或可以从顶部读取然后过滤的单个父文件夹)。即使这意味着首先制作临时副本。您是否考虑过在文件上使用 TTL 来满足存储 n 年的问题?这可以让您以更传统的方式构建湖泊。如果不可能,我会考虑使用 azure 函数而不是 databricks。从它的声音来看,数据很小 - 集群似乎很大,因为它正在做什么? @simon_dmorias 很抱歉,我的文件夹结构是 raw > year > month > day -- files,所以所有人都在同一年/月内固定的天数。由于客户端架构,我被锁定在使用数据块上,这有点矫枉过正,但它是整个更大解决方案的一部分。 嗯,好的。因此,您只需指向根文件夹并全部导入即可。然后过滤数据框以删除不需要的任何内容。删除该循环将产生最大的不同。 Spark 有点垃圾,有很多小文件,但性能应该比你目前看到的要好。 @simon_dmorias 我仍然有单级循环,但它的执行速度确实比昨晚快几个数量级,遗憾的是在 Databricks 6.0 + Python API 中不再可以访问 dbfs 文件存储 - 所以使用 Pandas 也是不可能的!

以上是关于在 Pyspark Databricks 中处理 1000 个 JSON 文件的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 pySpark 从 Databricks 在 Cosmos DB/documentDB 中写入数据帧

无法在 Databricks 中使用 pyspark 读取 json 文件

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

将数据存储到 PySpark (Azure - DataBricks) 中的数据库非常慢

如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧

PySpark:如何更新嵌套列?