在不计算的情况下获取 Spark 数据框中的行数

Posted

技术标签:

【中文标题】在不计算的情况下获取 Spark 数据框中的行数【英文标题】:Getting the number of rows in a Spark dataframe without counting 【发布时间】:2019-05-17 13:36:57 【问题描述】:

我在 Spark DataFrame 上应用了许多转换(filter、groupBy、join)。我想在每次转换后获得 DataFrame 中的行数。

我目前正在使用函数 count() 在每次转换后计算行数,但这每次都会触发一个未真正优化的操作。

我想知道是否有任何方法可以知道行数而无需触发原始作业之外的其他操作。

【问题讨论】:

样本数据集和代码 sn-p 将不胜感激。 :-) 【参考方案1】:

您可以为每个阶段使用一个累加器,并在每个阶段之后在地图中增加累加器。然后,在您执行完操作后,您将计算所有阶段的计数。

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

myDataFrame
    .filter(col("x") === lit(3))
    .map(x => 
      filterCounter.add(1)
      x
    )        .groupBy(col("x"))
    .agg(max("y"))
    .map(x => 
      groupByCounter.add(1)
      x
    )
    .join(myOtherDataframe, col("x") === col("y"))
    .map(x => 
      joinCounter.add(1)
      x
    )
    .count()

print(s"count for filter = $filterCounter.value")
print(s"count for group by = $groupByCounter.value")
print(s"count for join = $joinCounter.value")

【讨论】:

是否可以动态创建可变数量的累加器,并且可能将所有累加器存储在 Map[(String, Long)] 中?因为我动态地将转换添加到我的 DataFrame【参考方案2】:

每个运算符本身都有几个指标。这些指标在 spark UI 的 SQL 选项卡中可见。

如果不使用 SQL,我们可以在执行后自省数据框的查询执行对象,以访问指标(内部累加器)

示例:df.queryExecution.executedPlan.metrics 将给出 DAG 中最顶层节点的指标。

【讨论】:

使用df.queryExecution.executedPlan.metrics我总是得到结果Map(numOutputRows -> SQLMetric(id: 891, name: Some(number of output rows), value: 0))这正常吗? Ypou 可能正在执行以下操作 1. 在执行查询之前进行检查 0.2。或者,如果您的查询实际上返回零行。您可以浏览执行计划(它是一棵树)以查找每个节点上的指标。简单代码df.queryExecution.executedPlan.foreach(println(_.metrics)) 上面要在查询执行完之后再做。 我是在写入操作后执行此操作,但我的数据框的行数超过零。 ` df.queryExecution.executedPlan.foreach(x => println(x))` 将有助于找到开始发生指标丢失的节点。如果您可以访问 Spark UI,我们可以在执行后的 SQL 选项卡中显示这些指标。【参考方案3】:

在对 Apache Spark 有更多经验后回到这个问题,以补充 randal 的答案。

您还可以使用 UDF 来增加计数器。

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")

def countUdf(acc: LongAccumulator): UserDefinedFunction = udf  (x: Int) =>
  acc.add(1)
  x


myDataFrame
  .filter(col("x") === lit(3))
  .withColumn("x", countUdf(filterCounter)(col("x")))
  .groupBy(col("x"))
  .agg(max("y"))
  .withColumn("x", countUdf(groupByCounter)(col("x")))
  .join(myOtherDataframe, col("x") === col("y"))
  .withColumn("x", countUdf(joinCounter)(col("x")))
  .count()

print(s"count for filter = $filterCounter.value")
print(s"count for group by = $groupByCounter.value")
print(s"count for join = $joinCounter.value")

这应该更有效,因为 spark 只需反序列化 UDF 中使用的列,但必须小心使用,因为催化剂可以更轻松地重新排序操作(例如在调用 udf 之前推送过滤器)

【讨论】:

以上是关于在不计算的情况下获取 Spark 数据框中的行数的主要内容,如果未能解决你的问题,请参考以下文章

iPhone:如何在不考虑宽度的情况下获取textView中输入的行数

如何根据数据框中的列值获取特定的行数[重复]

计算火花数据框中的字数

如何计算包含一组列中的值和 Pandas 数据框中另一列中的另一个值的行数?

vim - 如何在不计算行数的情况下删除一大块文本?

为spark scala中的数据框中的每个组采样不同数量的随机行