在 spark DF 中使用 partitionBy 后是不是可以重新分区?

Posted

技术标签:

【中文标题】在 spark DF 中使用 partitionBy 后是不是可以重新分区?【英文标题】:Is it possible to do repartition after using partitionBy in a spark DF?在 spark DF 中使用 partitionBy 后是否可以重新分区? 【发布时间】:2019-02-08 07:09:53 【问题描述】:

我问这个问题是因为如果我将 repartition 指定为 5,那么我的所有数据(>200Gigs)都会被移动到 5 个不同的执行器,并且 98% 的资源未被使用。然后 partitionBy 正在发生,这再次造成了很多洗牌。有没有办法先进行 partitionBy,然后再对数据运行 repartition?

【问题讨论】:

答案是否得到您的认可? 【参考方案1】:

虽然这个问题并不完全容易理解,但以下与另一个答案一致,这种方法应该避免不必要的改组中提到的问题:

val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]

df.repartition(n, $"field_1", $"field_2", ...)
  .sortWithinPartitions("fieldx", "field_y")
  .write.partitionBy("field_1", "field_2", ...)
  .format("location")

其中 [field_1, field_2, ...] 是 repartition 和 partitionBy 的同一组字段。

【讨论】:

【参考方案2】:

您可以使用repartition(5, col("$colName"))。 因此,当您制作 partitionBy("$colName") 时,您将跳过 '$colName' 的随机播放,因为它已经被重新分区。

还可以考虑将执行程序数量乘以已用内核数量乘以 3 的乘积(尽管这可能在 2 和 4 之间变化)。 所以我们知道,Spark 只能为 RDD 的每个分区运行 1 个并发任务。假设每个执行器有 8 个核心和 5 个执行器: 您需要:8 * 5 * 3 = 120 个分区

【讨论】:

以上是关于在 spark DF 中使用 partitionBy 后是不是可以重新分区?的主要内容,如果未能解决你的问题,请参考以下文章

pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?

如何在Spark中使用AND或OR条件

使用来自另一个 DF (Scala Spark) 的模式创建一个空 DF

使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译

在 spark DF 中使用 partitionBy 后是不是可以重新分区?

在 Spark 上打印查询 Hive 的物理计划