Spark DataFrame/RDD 中的前 N ​​个项目

Posted

技术标签:

【中文标题】Spark DataFrame/RDD 中的前 N ​​个项目【英文标题】:Top N items from a Spark DataFrame/RDD 【发布时间】:2018-02-13 20:28:12 【问题描述】:

我的要求是从数据框中获取前 N 个项目。

我有这个数据框:

val df = List(
      ("MA", "USA"),
      ("MA", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA")).toDF("value", "country")

我能够将其映射到RDD[((Int, String), Long)] colValCount: 读取:((colIdx, value), count)

((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)

现在我需要获取每个列索引的前 2 个项目。所以我的预期输出是这样的:

RDD[((Int, String), Long)]

((0,CT),5)
((0,NY),6)
((1,USA),17)

我尝试在 DataFrame 中使用 freqItems api,但速度很慢。

欢迎提出任何建议。

【问题讨论】:

我认为您需要 sort()limit() 的某种组合,但我不明白您是如何获得输出的。 【参考方案1】:

例如:

import org.apache.spark.sql.functions._

df.select(lit(0).alias("index"), $"value")
   .union(df.select(lit(1), $"country"))
   .groupBy($"index", $"value")
   .count
  .orderBy($"count".desc)
  .limit(3)
  .show

// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// |    1|  USA|   17|
// |    0|   NY|    6|
// |    0|   CT|    5|
// +-----+-----+-----+

地点:

df.select(lit(0).alias("index"), $"value")
  .union(df.select(lit(1), $"country"))

创建一个两列DataFrame

// +-----+-----+
// |index|value|
// +-----+-----+
// |    0|   MA|
// |    0|   MA|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    1|  USA|
// |    1|  USA|
// |    1|  USA|
// +-----+-----+

如果您希望每列有两个值:

import org.apache.spark.sql.DataFrame

def topN(df: DataFrame, key: String, n: Int)  = 
   df.select(
        lit(df.columns.indexOf(key)).alias("index"), 
        col(key).alias("value"))
     .groupBy("index", "value")
     .count
     .orderBy($"count")
     .limit(n)


topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+ 
// |index|value|count|
// +-----+-----+-----+
// |    0|   MA|    2|
// |    0|   OH|    4|
// |    1|  USA|   17|
// +-----+-----+-----+

就像pault said - 只是“sort()limit() 的某种组合”。

【讨论】:

orderBy()limit() 的某种组合...基本上就是我所说的 :-) @pault 是的,我猜是 groupByagg 的某种组合,这里不时带有窗口函数 :-) .orderBy($"count".desc).limit(3) 在这种情况下给出了指定的结果,但在一般情况下它没有给出每个列索引的前 2 项。 谢谢让我试试看! @user8371915 这可行,但对于 TB 数据仍然很慢,因为我也在运行 170 多列。性能与我之前提到的 freqItems API 相当。处理整个数据大约需要 1 个多小时。【参考方案2】:

执行此操作的最简单方法(自然窗口函数)是编写 SQL。 Spark 自带 SQL 语法,而 SQL 是解决这个问题的绝佳工具。

将您的数据框注册为临时表,然后对其进行分组和窗口化。

spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r 
             FROM (
               SELECT idx, value, COUNT(*) as c 
               FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df) 
               GROUP BY idx, value) 
             HAVING r <= 2""").show()

我想看看是否有任何程序/scala 方法可以让您在没有迭代或循环的情况下执行窗口函数。我不知道 Spark API 中有任何支持它的东西。

顺便说一句,如果您想要包含任意数量的列,那么您可以很容易地使用列列表动态生成内部部分(SELECT 0 as idx, value ... UNION ALL SELECT 1, country 等)。

【讨论】:

谢谢,我已经尝试使用它抱怨 GC 开销的窗口函数。我也会试试你的解决方案。【参考方案3】:

鉴于您的最后一个 RDD:

val rdd =
  sc.parallelize(
    List(
      ((0, "CT"), 5),
      ((0, "MA"), 2),
      ((0, "OH"), 4),
      ((0, "NY"), 6),
      ((1, "USA"), 17)
    ))

rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)

我们首先获取给定列索引 (.filter(_._1._1 == 0)) 的项目。然后我们按降序对项目进行排序 (.sortBy(-_._2))。最后,我们最多取前 2 个元素 (.take(2)),如果记录的 nbr 小于 2,则只取 1 个元素。

【讨论】:

【参考方案4】:

您可以使用 Sparkz 中定义的这个辅助函数映射每个单独的分区,然后将它们组合在一起:

package sparkz.utils

import scala.reflect.ClassTag

object TopElements 
  def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
    elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) 
      case (accumulator@(topElems, minScore), elem) =>
        val score = scoreFunc(elem)
        if (topElems.size < n)
          (topElems + (elem -> score), math.min(minScore, score))
        else if (score > minScore) 
          val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
          (newTopElems, newTopElems.map(_._2).min)
        
        else accumulator
    
      ._1.toList.sortBy(_._2).reverse.map(_._1)

来源:https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala

【讨论】:

以上是关于Spark DataFrame/RDD 中的前 N ​​个项目的主要内容,如果未能解决你的问题,请参考以下文章

[Spark][Python][DataFrame][RDD]从DataFrame得到RDD的例子

spark Dataframe/RDD 相当于描述中给出的 pandas 命令?

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

SparkSQL入门

回顾IDEA 开发 SparkSQL 基础编程

使用 spark 和 scala 根据总时间查找每个 ID 的前 N ​​个游戏