spark有效地找到一组列的最频繁值

Posted

技术标签:

【中文标题】spark有效地找到一组列的最频繁值【英文标题】:spark find most frequent value for a set of columns efficiently 【发布时间】:2016-12-21 19:38:05 【问题描述】:

我想知道在 spark 中是否有比使用rank() 更有效的方法来查找一组列的最频繁值,以便将其用作缺失值的插补。

例如在 spark-sql 我可以制定类似的东西 how to select the most frequently appearing values? 每列。 此解决方案适用于使用排名的单个列。我正在寻找的是a)更有效的变体(如第一个答案所述)和b)比使用for循环和a)的解决方案更优化的东西来申请多列。

您认为在 spark 中优化这个有什么可能性吗?

编辑

一个例子。这是一个小数据集

case class FooBarGG(foo: Int, bar: String, baz: String, dropme: String)
val df = Seq((0, "first", "A", "dropme"), (1, "second", "A", "dropme2"),
    (0, "first", "B", "foo"),
    (1, "first", "C", "foo"))
    .toDF("foo", "bar", "baz", "dropme").as[FooBarGG]
val columnsFactor = Seq("bar", "baz")
val columnsToDrop = Seq("dropme")
val factorCol= (columnsFactor ++ columnsToDrop).map(c => col(c))

从答案中查询

df.groupBy(factorCol: _*).count.agg(max(struct($"count" +: factorCol: _*)).alias("mostFrequent")).show
+--------------------+
|        mostFrequent|
+--------------------+
|[1,second,A,dropme2]|
+--------------------+
|-- mostFrequent: struct (nullable = true)
 |    |-- count: long (nullable = false)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |    |-- dropme: string (nullable = true)

是结果,但对于列 bar -> first,baz -> A 和对于 drompe -> foo 是单个 top1 最常见的值,与返回的结果不同。

【问题讨论】:

【参考方案1】:

您可以使用简单的聚合,只要您的字段可以排序并且 count 是前导的:

import org.apache.spark.sql.functions._

val df = Seq("John", "Jane", "Eve", "Joe", "Eve").toDF("name")
val grouping = Seq($"name")

df.groupBy(grouping: _*).count.agg(max(struct($"count" +: grouping: _*)))

也可以使用静态类型的Dataset

import org.apache.spark.sql.catalyst.encoders.RowEncoder

df.groupByKey(x => x)(RowEncoder(df.schema)).count.reduce(
  (x, y) => if (x._2 > y._2) x else y
)

您可以调整分组列或键功能以处理更复杂的场景。

【讨论】:

【参考方案2】:
 // find most frequent value in the grouped columns
  def mode(df: DataFrame, valueColumnName: String, groupByColumns: Seq[String]): DataFrame = 
    df.groupBy(valueColumnName, groupByColumns: _*).count()
      .withColumn(
        "rn",
        row_number().over(Window.partitionBy(groupByColumns.head, groupByColumns.tail: _*).orderBy(col("count").desc))
      )
      .where(col("rn") === 1)
      .select(valueColumnName, groupByColumns: _*)
  

【讨论】:

以上是关于spark有效地找到一组列的最频繁值的主要内容,如果未能解决你的问题,请参考以下文章

根据另一列的位置从一组列中返回值

算法或 SQL:查找一组列的条件,确保结果集在特定列中的值始终 > 0

通过聚合在pandas组中查找频繁项的最有效方法是啥[重复]

有效地找到最近的字典键

有效地找到稀疏矩阵的最小列的索引

验证字段值的有效方法 Spark