使用 SparkSQL 读取多个 parquet 文件时将子文件夹作为列获取

Posted

技术标签:

【中文标题】使用 SparkSQL 读取多个 parquet 文件时将子文件夹作为列获取【英文标题】:Get the subfolder as a column while reading multiple parquet files with SparkSQL 【发布时间】:2020-07-09 13:38:32 【问题描述】:

我想为使用 SparkSQL 从 parquet 文件加载的每个 DataFrame 添加一列,以将路径的子字符串添加到文件,然后使其成为单个 DataFrame。

例如,在加载 .../subfolder1/my_parquet_file1.parquet.../subfolder2/my_parquet_file2.parquet 时,我希望有以下最终 DataFrame:

col1 | col2 | subfolder
------------------------
aaa  | bbb  | subfolder1
ccc  | ddd  | subfolder1
eee  | fff  | subfolder2
ggg  | hhh  | subfolder2

以下代码允许加载路径列表中的所有文件:

sqlContext.read.schema(schema).parquet(paths: _*)

但是通过直接使用最终的 DataFrame,我无法根据每行的来源添加子文件夹。

有没有办法在不按顺序加载每个文件的情况下做到这一点?

【问题讨论】:

【参考方案1】:

试试这个 -

val df = spark.read
      .parquet(
        getClass.getResource("/parquet/day/day1/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
          .getPath,
        getClass.getResource("/parquet/day/day2/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
          .getPath
      )
    df.show(false)
    df.printSchema()

    /**
      * +------+
      * |price |
      * +------+
      * |123.15|
      * |123.15|
      * +------+
      *
      * root
      * |-- price: decimal(5,2) (nullable = true)
      */

    df.withColumn("subfolder", element_at(split(input_file_name(), "/"), -2))
      .show(false)

    /**
      * +------+---------+
      * |price |subfolder|
      * +------+---------+
      * |123.15|day1     |
      * |123.15|day2     |
      * +------+---------+
      */

【讨论】:

以上是关于使用 SparkSQL 读取多个 parquet 文件时将子文件夹作为列获取的主要内容,如果未能解决你的问题,请参考以下文章

SparkSQL - 直接读取镶木地板文件

Spark SQL - 如何将 DataFrame 写入文本文件?

Parquet性能测试之项目实践中应用测试

从多个 parquet 路径创建 Spark SQL 表

12.spark sql之读写数据

如何在 Spark 2.1.0 中使用 SparkSQL 将“.txt”转换为“.parquet”?