Spark DataFrame重新分区:未保留的分区数

Posted

技术标签:

【中文标题】Spark DataFrame重新分区:未保留的分区数【英文标题】:Spark DataFrame repartition : number of partition not preserved 【发布时间】:2017-01-25 15:01:36 【问题描述】:

根据 Spark 1.6.3 的文档,repartition(partitionExprs: Column*) 应该保留结果数据帧中的分区数:

返回由给定分区划分的新 DataFrame 保留现有分区数的表达式

(取自https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.DataFrame)

但以下示例似乎显示了其他内容(请注意,在我的情况下 spark-master 是 local[4]):

val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[4]"))
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x")
myDF.rdd.getNumPartitions // 4 
myDF.repartition($"x").rdd.getNumPartitions //  200 !

这怎么解释?我将 Spark 1.6.3 用作独立应用程序(即在 IntelliJ IDEA 中本地运行)

编辑:这个问题没有解决来自Dropping empty DataFrame partitions in Apache Spark 的问题(即如何在不产生空分区的情况下沿列重新分区),但为什么文档说的内容与我在示例中观察到的不同

【问题讨论】:

shuffle 参数设置为 true 吗? @FaigB 不确定您指的是哪个参数?我在问题中添加了 Spark-Conf Dropping empty DataFrame partitions in Apache Spark的可能重复 【参考方案1】:

这与在 Spark 中启用的 Tungsten project 相关。它使用硬件优化并调用哈希分区来触发 shuffle 操作。默认情况下,spark.sql.shuffle.partitions 设置为 200。您可以在重新分区之前和之后调用数据帧上的说明来验证:

myDF.explain

val repartitionedDF = myDF.repartition($"x")

repartitionedDF.explain

【讨论】:

在随机播放中也使用散列,分区的数量将根据映射器和减速器任务的数量而增加。

以上是关于Spark DataFrame重新分区:未保留的分区数的主要内容,如果未能解决你的问题,请参考以下文章

在 RDD 转换上保留 Spark DataFrame 列分区

如何保证 Spark Dataframe 中的重新分区

Spark中的最佳重新分区方式

计算spark Dataframe中分组数据的分位数

Apache Spark 数据帧在写入镶木地板时不会重新分区

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区