使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作
Posted
技术标签:
【中文标题】使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作【英文标题】:Efficient PairRDD operations on DataFrame with Spark SQL GROUP BY 【发布时间】:2015-08-03 06:27:43 【问题描述】:这个问题是关于聚合操作时DataFrame
和RDD
之间的对偶性。在 Spark SQL 中,可以使用表生成 UDF 进行自定义聚合,但创建其中之一的用户友好性通常明显低于使用 RDD 可用的聚合函数,尤其是在不需要表输出的情况下。
是否有一种有效的方法可以将 aggregateByKey
等配对 RDD 操作应用于已使用 GROUP BY 分组或使用 ORDERED BY 排序的 DataFrame?
通常,需要一个显式的map
步骤来创建键值元组,例如dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...)
。这可以避免吗?
【问题讨论】:
【参考方案1】:不是真的。虽然DataFrames
可以转换为RDDs
,反之亦然,但这是相对复杂的操作,DataFrame.groupBy
之类的方法与RDD
上的对应方法的语义不同。
你能得到的最接近的东西是在 Spark 1.6.0 中引入的a new DataSet
API。它提供了与DataFrames
和GroupedDataset
类的更紧密集成,并具有自己的一组方法,包括reduce
、cogroup
或mapGroups
:
case class Record(id: Long, key: String, value: Double)
val df = sc.parallelize(Seq(
(1L, "foo", 3.0), (2L, "bar", 5.6),
(3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")
val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show
// +-----+-----------+
// | _1| _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+
在某些特定情况下,可以利用Orderable
语义对使用structs
或arrays
的数据进行分组和处理。你会在SPARK DataFrame: select the first row of each group中找到一个例子
【讨论】:
是的,数据集看起来确实很有趣,但 Spark 1.6.0 中的支持仍然有很多缺陷:它们是一个实验性功能。 它是 :) 矛盾的是,Spark DataFrames 在 PySpark 上的表现要比 Scala 好得多。不幸的是,JVM 和 Python 之间的跳跃使事情变得非常昂贵。 @zero323 ,一直在测试您的示例,但收到错误error: value reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset
。我错过了进口吗? (我设法找到的只有 reduce 与 RDD 相关)以上是关于使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)
sql语言 怎么求每组最大,就是用group by 分组后,求每组某列最大?
org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数