如何并行化 spark.read.parquet()?

Posted

技术标签:

【中文标题】如何并行化 spark.read.parquet()?【英文标题】:How to parallelize spark.read.parquet()? 【发布时间】:2020-02-11 20:09:52 【问题描述】:

我的 Spark 作业读取了一个文件夹,其中包含由列 partition 分区的 parquet 数据:

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .coalesce(1)
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

我注意到只创建了一个任务,所以它非常慢。源文件夹中有很多像partition=2019-01-07这样的子文件夹,每个子文件夹都包含很多扩展名为snappy.parquet的文件。我提交了作业 --num-executors 2 --executor-cores 4,RAM 不是问题。我尝试从 S3 和本地文件系统中读取。我尝试添加 .repartition(nPartitions),删除 .coalesce(1).partitionBy("date") 但还是一样。

您能否建议我如何让 Spark 并行读取这些 parquet 文件?

【问题讨论】:

没有什么奇怪的。如果您发现差异,请尝试阅读 sourceDir/2019-01-07 @Salim 这对我来说会有所帮助,因为我需要数据框中的分区列。 “合并架构”上的好发现。请更改您的问题以添加这种担忧,即每个分区可能具有不同的架构以帮助其他人找到答案。 【参考方案1】:

好吧,我找到了正确的代码:

    val spark = SparkSession
      .builder()
      .appName("Prepare Id Mapping")
      .getOrCreate()
    import spark.implicits._

    spark.read
      .option("mergeSchema", "true")
      .parquet(sourceDir)
      .filter($"field" === "ss_id" and $"int_value".isNotNull)
      .select($"int_value".as("ss_id"), $"partition".as("date"), $"ct_id")
      .write
      .partitionBy("date")
      .parquet(idMappingDir)

希望这会在将来节省一些时间。

【讨论】:

聚结取出来? @thebluephantom 只是取出合并并没有解决问题,但是加上 .option("mergeSchema", "true") 就可以了。 可能会强调这一点。干杯 我试图使这两行(带有合并和选项)以粗体显示,但它在代码 sn-p 中不起作用。

以上是关于如何并行化 spark.read.parquet()?的主要内容,如果未能解决你的问题,请参考以下文章

如何刷新 HDFS 路径?

Spark LuceneRDD - 它是如何工作的

使用 parquet 格式附加 Apache Spark 中列的描述

Spark数据框加入问题

如何使用具有不受支持类型的 Spark 读取镶木地板?

Spark缓慢重新分区许多小文件