如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数
Posted
技术标签:
【中文标题】如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数【英文标题】:How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd 【发布时间】:2019-06-13 14:18:37 【问题描述】:关于如何获得RDD
和/或DataFrame
的分区数有很多问题:答案总是:
rdd.getNumPartitions
或
df.rdd.getNumPartitions
不幸的是,这是对DataFrame
的昂贵操作,因为
df.rdd
需要将DataFrame
转换为rdd
。这是按运行时间排序的
df.count
我正在编写 可选 repartition
's 或 coalesce
'sa DataFrame
的逻辑 - 基于 当前 分区数是否在一个范围内可接受的值或低于或高于它们。
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame =
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap minp =>
if (inputPartitions < minp)
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
else
None
.getOrElse( maxPartitions.map maxp =>
if (inputPartitions > maxp)
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
else inDf
.getOrElse(inDf))
outDf
但我们不能以这种方式为每个 DataFrame
支付rdd.getNumPartitions
的费用。
是否没有任何方法可以获取此信息 - 例如从在线/临时catalog
查询registered
表可能吗?
更新 Spark GUI 显示 DataFrame.rdd 操作与作业中最长的 sql 一样长。我将重新运行该作业并在此处附上屏幕截图。
以下只是一个测试用例:它使用的是生产中数据大小的一小部分。最长的 sql
只有五分钟 - 而这个也将花费这么多时间(请注意,sql
没有帮助这里:它还必须随后执行,从而有效地使累积执行时间加倍)。
我们可以看到DataFrameUtils
第 30 行的.rdd
操作(如上面的 sn-p 所示)需要 5.1 分钟 - 而 save
操作仍然 需要 5.2 分钟-IE就后续save
的执行时间而言,我们确实没有通过执行.rdd
节省任何时间。
【问题讨论】:
这里有一个类似的问题***.com/questions/54269477/… 【参考方案1】:rdd.getNumPartitions
中的rdd
组件没有内在成本,因为返回的RDD
永远不会被评估。
虽然您可以通过使用调试器(我将把它作为练习留给读者)轻松地凭经验确定这一点,或者确定在基本案例场景中没有触发任何作业
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
这可能不足以说服您。所以让我们以更系统的方式来解决这个问题:
rdd
返回一个MapPartitionRDD
(ds
如上所述):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitions
invokes RDD.partitions
.
RDD.partitions
invokes getPartitions
(也可以随意跟踪检查点路径)。
RDD.getPartitions
is abstract.
所以本例中使用的实际实现是MapPartitionsRDD.getPartitions
,即delegates the call to the parent。
rdd
和源之间只有MapPartitionsRDD
。
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
| MapPartitionsRDD[2] at rdd at <console>:26 []
| MapPartitionsRDD[1] at rdd at <console>:26 []
| FileScanRDD[0] at rdd at <console>:26 []
同样,如果Dataset
包含一个交换,我们将跟随父母到最近的洗牌:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
| MapPartitionsRDD[12] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| ShuffledRowRDD[10] at rdd at <console>:26 []
+-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| FileScanRDD[4] at rdd at <console>:26 []
请注意,这个案例特别有趣,因为我们实际上触发了一个作业:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
这是因为我们遇到了无法静态确定分区的情况(请参阅Number of dataframe partitions after sorting? 和Why does sortBy transformation trigger a Spark job?)。
在这种情况下getNumPartitions
也会触发作业:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
但这并不意味着观察到的成本与.rdd
调用有某种关系。相反,在没有静态公式的情况下(例如,某些 Hadoop 输入格式,需要对数据进行全面扫描),查找 partitions
是一种内在成本。
请注意,此处提出的观点不应外推至Dataset.rdd
的其他应用程序。例如ds.rdd.count
确实会很昂贵而且很浪费。
【讨论】:
删除.rdd.getNumPartitions calls
– javadba 后,我的工作只用了一半以上的时间运行
@javadba 我相信我们之前讨论过这个 - 这不会以任何方式使这个答案无效 - 我明确指出计算分区可能很昂贵,并且在一般条件下,错误是将潜在成本归因于转换为外部类型。此外,我提出解决您提供的任何正式反例(抱歉,挥手或轶事证据不计算在内),或者完全修改(甚至撤销)我的答案,如果这样的假设反例证明它在某种程度上不正确或不完整。我真的没有其他东西可以提供。
啊,好吧,答案的第二部分可能有助于移动到(病房)顶部:因为第一部分因为不承认我的观察的有效性而脱落。我会奖励这个。【参考方案2】:
根据我的经验df.rdd.getNumPartitions
非常快,我从来没有遇到过超过一秒左右的时间。
你也可以试试
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
这将避免使用.rdd
【讨论】:
我会试试这个:在任何情况下都会被认为是一种有趣的方法 与 df.rdd.getNumPartitions 相比,该命令是否提高了性能? @CarlosAG 我不这么认为以上是关于如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数的主要内容,如果未能解决你的问题,请参考以下文章
如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?
如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较
如何判断一个函数是不是在不执行函数的情况下返回 Promise?