如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数

Posted

技术标签:

【中文标题】如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数【英文标题】:How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd 【发布时间】:2019-06-13 14:18:37 【问题描述】:

关于如何获得RDD 和/或DataFrame 的分区数有很多问题:答案总是:

 rdd.getNumPartitions

 df.rdd.getNumPartitions

不幸的是,这是对DataFrame昂贵操作,因为

 df.rdd

需要将DataFrame 转换为rdd。这是按运行时间排序的

 df.count

我正在编写 可选 repartition's 或 coalesce'sa DataFrame 的逻辑 - 基于 当前 分区数是否在一个范围内可接受的值或低于或高于它们。

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = 
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap minp =>
      if (inputPartitions < minp) 
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
       else 
        None
      
    .getOrElse( maxPartitions.map maxp =>
      if (inputPartitions > maxp) 
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
       else inDf
    .getOrElse(inDf))
    outDf
  

但我们不能以这种方式为每个 DataFrame 支付rdd.getNumPartitions 的费用。

是否没有任何方法可以获取此信息 - 例如从在线/临时catalog 查询registered 表可能吗?

更新 Spark GUI 显示 DataFrame.rdd 操作与作业中最长的 sql 一样长。我将重新运行该作业并在此处附上屏幕截图。

以下只是一个测试用例:它使用的是生产中数据大小的一小部分。最长的 sql 只有五分钟 - 而这个也将花费这么多时间(请注意,sql没有帮助这里:它还必须随后执行,从而有效地使累积执行时间加倍)。

我们可以看到DataFrameUtils 第 30 行的.rdd 操作(如上面的 sn-p 所示)需要 5.1 分钟 - 而 save 操作仍然 需要 5.2 分钟-IE就后续save 的执行时间而言,我们确实没有通过执行.rdd 节省任何时间。

【问题讨论】:

这里有一个类似的问题***.com/questions/54269477/… 【参考方案1】:

rdd.getNumPartitions 中的rdd 组件没有内在成本,因为返回的RDD 永远不会被评估。

虽然您可以通过使用调试器(我将把它作为练习留给读者)轻松地凭经验确定这一点,或者确定在基本案例场景中没有触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

这可能不足以说服您。所以让我们以更系统的方式来解决这个问题:

rdd 返回一个MapPartitionRDDds 如上所述):

scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD

RDD.getNumPartitionsinvokes RDD.partitions.

在非检查点场景RDD.partitions invokes getPartitions(也可以随意跟踪检查点路径)。 RDD.getPartitionsis abstract. 所以本例中使用的实际实现是MapPartitionsRDD.getPartitions,即delegates the call to the parent。

rdd 和源之间只有MapPartitionsRDD

scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
 |  MapPartitionsRDD[2] at rdd at <console>:26 []
 |  MapPartitionsRDD[1] at rdd at <console>:26 []
 |  FileScanRDD[0] at rdd at <console>:26 []

同样,如果Dataset 包含一个交换,我们将跟随父母到最近的洗牌:

scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
 |   MapPartitionsRDD[12] at rdd at <console>:26 []
 |   MapPartitionsRDD[11] at rdd at <console>:26 []
 |   ShuffledRowRDD[10] at rdd at <console>:26 []
 +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
    |  MapPartitionsRDD[5] at rdd at <console>:26 []
    |  FileScanRDD[4] at rdd at <console>:26 []

请注意,这个案例特别有趣,因为我们实际上触发了一个作业:

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)

这是因为我们遇到了无法静态确定分区的情况(请参阅Number of dataframe partitions after sorting? 和Why does sortBy transformation trigger a Spark job?)。

在这种情况下getNumPartitions 也会触发作业:

scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
res8: Array[Int] = Array(1, 0)

但这并不意味着观察到的成本与.rdd 调用有某种关系。相反,在没有静态公式的情况下(例如,某些 Hadoop 输入格式,需要对数据进行全面扫描),查找 partitions 是一种内在成本。

请注意,此处提出的观点不应外推至Dataset.rdd 的其他应用程序。例如ds.rdd.count 确实会很昂贵而且很浪费。

【讨论】:

删除.rdd.getNumPartitions calls – javadba 后,我的工作只用了一半以上的时间运行 @javadba 我相信我们之前讨论过这个 - 这不会以任何方式使这个答案无效 - 我明确指出计算分区可能很昂贵,并且在一般条件下,错误是将潜在成本归因于转换为外部类型。此外,我提出解决您提供的任何正式反例(抱歉,挥手或轶事证据不计算在内),或者完全修改(甚至撤销)我的答案,如果这样的假设反例证明它在某种程度上不正确或不完整。我真的没有其他东西可以提供。 啊,好吧,答案的第二部分可能有助于移动到(病房)顶部:因为第一部分因为不承认我的观察的有效性而脱落。我会奖励这个。【参考方案2】:

根据我的经验df.rdd.getNumPartitions 非常快,我从来没有遇到过超过一秒左右的时间。

你也可以试试

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

这将避免使用.rdd

【讨论】:

我会试试这个:在任何情况下都会被认为是一种有趣的方法 与 df.rdd.getNumPartitions 相比,该命令是否提高了性能? @CarlosAG 我不这么认为

以上是关于如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数的主要内容,如果未能解决你的问题,请参考以下文章

如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?

如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较

如何判断一个函数是不是在不执行函数的情况下返回 Promise?

如何在不增加成本情况下使用Cloudflare Cron 触发器?

在不知道字典名称的情况下如何在字典中查找内容

如何在不检查 MySQL 中的 AUTOINCREMENT 的情况下增加主键