Spark DataFrame/RDD 中的前 N 个项目
Posted
技术标签:
【中文标题】Spark DataFrame/RDD 中的前 N 个项目【英文标题】:Top N items from a Spark DataFrame/RDD 【发布时间】:2018-02-13 20:28:12 【问题描述】:我的要求是从数据框中获取前 N 个项目。
我有这个数据框:
val df = List(
("MA", "USA"),
("MA", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA")).toDF("value", "country")
我能够将其映射到RDD[((Int, String), Long)]
colValCount:
读取:((colIdx, value), count)
((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)
现在我需要获取每个列索引的前 2 个项目。所以我的预期输出是这样的:
RDD[((Int, String), Long)]
((0,CT),5)
((0,NY),6)
((1,USA),17)
我尝试在 DataFrame 中使用 freqItems api,但速度很慢。
欢迎提出任何建议。
【问题讨论】:
我认为您需要sort()
和 limit()
的某种组合,但我不明白您是如何获得输出的。
【参考方案1】:
例如:
import org.apache.spark.sql.functions._
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
.groupBy($"index", $"value")
.count
.orderBy($"count".desc)
.limit(3)
.show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 1| USA| 17|
// | 0| NY| 6|
// | 0| CT| 5|
// +-----+-----+-----+
地点:
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
创建一个两列DataFrame
:
// +-----+-----+
// |index|value|
// +-----+-----+
// | 0| MA|
// | 0| MA|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 1| USA|
// | 1| USA|
// | 1| USA|
// +-----+-----+
如果您希望每列有两个值:
import org.apache.spark.sql.DataFrame
def topN(df: DataFrame, key: String, n: Int) =
df.select(
lit(df.columns.indexOf(key)).alias("index"),
col(key).alias("value"))
.groupBy("index", "value")
.count
.orderBy($"count")
.limit(n)
topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 0| MA| 2|
// | 0| OH| 4|
// | 1| USA| 17|
// +-----+-----+-----+
就像pault said - 只是“sort()
和 limit()
的某种组合”。
【讨论】:
orderBy()
和 limit()
的某种组合...基本上就是我所说的 :-)
@pault 是的,我猜是 groupBy
和 agg
的某种组合,这里不时带有窗口函数 :-)
.orderBy($"count".desc).limit(3)
在这种情况下给出了指定的结果,但在一般情况下它没有给出每个列索引的前 2 项。
谢谢让我试试看!
@user8371915 这可行,但对于 TB 数据仍然很慢,因为我也在运行 170 多列。性能与我之前提到的 freqItems API 相当。处理整个数据大约需要 1 个多小时。【参考方案2】:
执行此操作的最简单方法(自然窗口函数)是编写 SQL。 Spark 自带 SQL 语法,而 SQL 是解决这个问题的绝佳工具。
将您的数据框注册为临时表,然后对其进行分组和窗口化。
spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r
FROM (
SELECT idx, value, COUNT(*) as c
FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df)
GROUP BY idx, value)
HAVING r <= 2""").show()
我想看看是否有任何程序/scala 方法可以让您在没有迭代或循环的情况下执行窗口函数。我不知道 Spark API 中有任何支持它的东西。
顺便说一句,如果您想要包含任意数量的列,那么您可以很容易地使用列列表动态生成内部部分(SELECT 0 as idx, value ... UNION ALL SELECT 1, country
等)。
【讨论】:
谢谢,我已经尝试使用它抱怨 GC 开销的窗口函数。我也会试试你的解决方案。【参考方案3】:鉴于您的最后一个 RDD:
val rdd =
sc.parallelize(
List(
((0, "CT"), 5),
((0, "MA"), 2),
((0, "OH"), 4),
((0, "NY"), 6),
((1, "USA"), 17)
))
rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)
我们首先获取给定列索引 (.filter(_._1._1 == 0)
) 的项目。然后我们按降序对项目进行排序 (.sortBy(-_._2)
)。最后,我们最多取前 2 个元素 (.take(2)
),如果记录的 nbr 小于 2,则只取 1 个元素。
【讨论】:
【参考方案4】:您可以使用 Sparkz 中定义的这个辅助函数映射每个单独的分区,然后将它们组合在一起:
package sparkz.utils
import scala.reflect.ClassTag
object TopElements
def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue))
case (accumulator@(topElems, minScore), elem) =>
val score = scoreFunc(elem)
if (topElems.size < n)
(topElems + (elem -> score), math.min(minScore, score))
else if (score > minScore)
val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
(newTopElems, newTopElems.map(_._2).min)
else accumulator
._1.toList.sortBy(_._2).reverse.map(_._1)
来源:https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala
【讨论】:
以上是关于Spark DataFrame/RDD 中的前 N 个项目的主要内容,如果未能解决你的问题,请参考以下文章
[Spark][Python][DataFrame][RDD]从DataFrame得到RDD的例子
spark Dataframe/RDD 相当于描述中给出的 pandas 命令?