使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作

Posted

技术标签:

【中文标题】使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作【英文标题】:Efficient PairRDD operations on DataFrame with Spark SQL GROUP BY 【发布时间】:2015-08-03 06:27:43 【问题描述】:

这个问题是关于聚合操作时DataFrameRDD 之间的对偶性。在 Spark SQL 中,可以使用表生成 UDF 进行自定义聚合,但创建其中之一的用户友好性通常明显低于使用 RDD 可用的聚合函数,尤其是在不需要表输出的情况下。

是否有一种有效的方法可以将 aggregateByKey 等配对 RDD 操作应用于已使用 GROUP BY 分组或使用 ORDERED BY 排序的 DataFrame?

通常,需要一个显式的map 步骤来创建键值元组,例如dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...)。这可以避免吗?

【问题讨论】:

【参考方案1】:

不是真的。虽然DataFrames 可以转换为RDDs,反之亦然,但这是相对复杂的操作,DataFrame.groupBy 之类的方法与RDD 上的对应方法的语义不同。

你能得到的最接近的东西是在 Spark 1.6.0 中引入的a new DataSet API。它提供了与DataFramesGroupedDataset 类的更紧密集成,并具有自己的一组方法,包括reducecogroupmapGroups

case class Record(id: Long, key: String, value: Double)

val df = sc.parallelize(Seq(
    (1L, "foo", 3.0), (2L, "bar", 5.6),
    (3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")

val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show

// +-----+-----------+
// |   _1|         _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+

在某些特定情况下,可以利用Orderable 语义对使用structsarrays 的数据进行分组和处理。你会在SPARK DataFrame: select the first row of each group中找到一个例子

【讨论】:

是的,数据集看起来确实很有趣,但 Spark 1.6.0 中的支持仍然有很多缺陷:它们是一个实验性功能。 它是 :) 矛盾的是,Spark DataFrames 在 PySpark 上的表现要比 Scala 好得多。不幸的是,JVM 和 Python 之间的跳跃使事情变得非常昂贵。 @zero323 ,一直在测试您的示例,但收到错误 error: value reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset 。我错过了进口吗? (我设法找到的只有 reduce 与 RDD 相关)

以上是关于使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)

sql语言 怎么求每组最大,就是用group by 分组后,求每组某列最大?

org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数

spark Group By数据框列没有聚合[重复]

关于SQL中两张表联合sum和group by的查询问题

Spark Window 聚合与 Group By/Join 性能