Spark DataFrame重新分区:未保留的分区数
Posted
技术标签:
【中文标题】Spark DataFrame重新分区:未保留的分区数【英文标题】:Spark DataFrame repartition : number of partition not preserved 【发布时间】:2017-01-25 15:01:36 【问题描述】:根据 Spark 1.6.3 的文档,repartition(partitionExprs: Column*)
应该保留结果数据帧中的分区数:
返回由给定分区划分的新 DataFrame 保留现有分区数的表达式
(取自https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.DataFrame)
但以下示例似乎显示了其他内容(请注意,在我的情况下 spark-master 是 local[4]
):
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[4]"))
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x")
myDF.rdd.getNumPartitions // 4
myDF.repartition($"x").rdd.getNumPartitions // 200 !
这怎么解释?我将 Spark 1.6.3 用作独立应用程序(即在 IntelliJ IDEA 中本地运行)
编辑:这个问题没有解决来自Dropping empty DataFrame partitions in Apache Spark 的问题(即如何在不产生空分区的情况下沿列重新分区),但为什么文档说的内容与我在示例中观察到的不同
【问题讨论】:
shuffle 参数设置为 true 吗? @FaigB 不确定您指的是哪个参数?我在问题中添加了 Spark-Conf Dropping empty DataFrame partitions in Apache Spark的可能重复 【参考方案1】:这与在 Spark 中启用的 Tungsten project 相关。它使用硬件优化并调用哈希分区来触发 shuffle 操作。默认情况下,spark.sql.shuffle.partitions 设置为 200。您可以在重新分区之前和之后调用数据帧上的说明来验证:
myDF.explain
val repartitionedDF = myDF.repartition($"x")
repartitionedDF.explain
【讨论】:
在随机播放中也使用散列,分区的数量将根据映射器和减速器任务的数量而增加。以上是关于Spark DataFrame重新分区:未保留的分区数的主要内容,如果未能解决你的问题,请参考以下文章
在 RDD 转换上保留 Spark DataFrame 列分区
Apache Spark 数据帧在写入镶木地板时不会重新分区
如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区