在 spark DF 中使用 partitionBy 后是不是可以重新分区?
Posted
技术标签:
【中文标题】在 spark DF 中使用 partitionBy 后是不是可以重新分区?【英文标题】:Is it possible to do repartition after using partitionBy in a spark DF?在 spark DF 中使用 partitionBy 后是否可以重新分区? 【发布时间】:2019-02-08 07:09:53 【问题描述】:我问这个问题是因为如果我将 repartition 指定为 5,那么我的所有数据(>200Gigs)都会被移动到 5 个不同的执行器,并且 98% 的资源未被使用。然后 partitionBy 正在发生,这再次造成了很多洗牌。有没有办法先进行 partitionBy,然后再对数据运行 repartition?
【问题讨论】:
答案是否得到您的认可? 【参考方案1】:虽然这个问题并不完全容易理解,但以下与另一个答案一致,这种方法应该避免不必要的改组中提到的问题:
val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]
df.repartition(n, $"field_1", $"field_2", ...)
.sortWithinPartitions("fieldx", "field_y")
.write.partitionBy("field_1", "field_2", ...)
.format("location")
其中 [field_1, field_2, ...] 是 repartition 和 partitionBy 的同一组字段。
【讨论】:
【参考方案2】:您可以使用repartition(5, col("$colName"))
。
因此,当您制作 partitionBy("$colName")
时,您将跳过 '$colName'
的随机播放,因为它已经被重新分区。
还可以考虑将执行程序数量乘以已用内核数量乘以 3 的乘积(尽管这可能在 2 和 4 之间变化)。 所以我们知道,Spark 只能为 RDD 的每个分区运行 1 个并发任务。假设每个执行器有 8 个核心和 5 个执行器: 您需要:8 * 5 * 3 = 120 个分区
【讨论】:
以上是关于在 spark DF 中使用 partitionBy 后是不是可以重新分区?的主要内容,如果未能解决你的问题,请参考以下文章
pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?
使用来自另一个 DF (Scala Spark) 的模式创建一个空 DF
使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译