为啥 Spark DataFrame 会创建错误数量的分区?

Posted

技术标签:

【中文标题】为啥 Spark DataFrame 会创建错误数量的分区?【英文标题】:Why Spark DataFrame is creating wrong number of partitions?为什么 Spark DataFrame 会创建错误数量的分区? 【发布时间】:2017-07-03 05:23:47 【问题描述】:

我有一个包含 2 列的 spark 数据框 - col1col2

scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]

当我以parquet 格式在磁盘上写入df 时,要将所有数据写入文件数量等于col1 中唯一值的数量,我使用col1 执行repartition,就像这样:

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")

以上代码在文件系统中只生成一个文件。但是,shuffle 操作的次数变成了 200。

我在这里无法理解一件事,如果col1 只包含一个值,即1,那么它为什么要在repartition 中创建200 个分区?

【问题讨论】:

【参考方案1】:

repartition(columnName) 默认创建 200 个分区(更具体地说,spark.sql.shuffle.partitions 分区),无论 col1 有多少唯一值。如果只有 1 个唯一值 col1,则 199 个分区为空。另一方面,如果 col1 的唯一值超过 200 个,则每个分区将有多个 col1 值。

如果您只想要 1 个分区,那么您可以使用 repartition(1,col("col1")) 或只是 coalesce(1)。但并不是说coalesce 的行为不同,因为coalesce 我在你的代码中被进一步向上移动,你可能会失去并行性(参见How to prevent Spark optimization)

如果你想检查你的分区的内容,我为此做了两种方法:

// calculates record count per partition
def inspectPartitions(df: DataFrame) = 
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => 
       Iterator(partIt.toSeq.size)
    
    ).toDF("record_count")


// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = 
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => 
      val part = partIt.toSet
      val partSize = part.size
        val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
        val partKeyStr = partKeys.mkString(", ")
        val partKeyCount = partKeys.size
       Iterator((partKeys.toArray,partSize))
    
    ).toDF("partitions","record_count")

现在你可以例如像这样检查您的数据框:

inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show

【讨论】:

【参考方案2】:

在Spark SQL shuffle世界中,shuffle partition的默认数量是200,由spark.sql.shuffle.partitions控制

【讨论】:

以上是关于为啥 Spark DataFrame 会创建错误数量的分区?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

为啥分配为 None 时会创建副本?

Julia:将 DataFrame 传递给函数会创建指向 DataFrame 的指针?

为啥 Wordpress 会创建一个巨大的 error_log 文件?

spark saveAsTable 真的会创建一个表吗?

为啥启用 Cloud Run API 会创建这么多服务帐号?为啥他们有这么多特权?