spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups
Posted nefu-ljw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups相关的知识,希望对你有一定的参考价值。
groupBy 对比 groupByKey
groupBy: groupBy类似于传统SQL语言中的group by子语句,但比较不同的是groupBy()可以带多个列名,对多个列进行group。比如想根据 “id” 和 “name” 进行 groupBy 的话可以
df.groupBy("id","name")
groupBy返回的类型是RelationalGroupedDataset。
groupByKey: groupByKey则更加灵活,可以根据用户自己对列的组合来进行groupBy,比如上面的那个例子,根据 “id” 和 “name” 进行 groupBy,使用groupByKey可以这样。
//同前面的goupBy效果是一样的,但返回的类型是不一样的
df.toDF("id","name").groupByKey(row =>
row.getString(0) + row.getString(1)
)
但和groupBy不同的是groupByKey返回的类型是KeyValueGroupedDataset。
groupByKey + mapGroups / groupBy + agg
https://stackoverflow.com/questions/49291397/spark-mapgroups-on-a-dataset
df.groupByKey(t => (t.A, t.B)) // 以(属性A,属性B)分组
.mapGroups...=>...
.toDF()
可以等价于
df.groupBy($"A", $"B") // 以(属性A,属性B)分组
.agg...=>...
.toDF()
(1)groupByKey + mapGroups
val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
.groupByKey(_._1).mapGroupscase(k,iter) =>
val listBuffer1 = new ListBuffer[Double]
val listBuffer2 = new ListBuffer[Int]
for(a <- iter)
listBuffer1 += a._2._1
listBuffer2 += a._2._2
(k._1, k._2, listBuffer1.sum, listBuffer2.sum)
.toDF("KeyHour","KeyCategory","TotalComm","TotalValue")
.orderBy($"KeyHour".asc)
(2)groupBy + agg
val resultSum = df.groupBy($"Hour", $"Category")
.agg(sum($"TotalComm").as("TotalComm"), sum($"TotalValue").as("TotalValue"))
.orderBy(asc("Hour"))
以上是关于spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups的主要内容,如果未能解决你的问题,请参考以下文章
值 createGlobalTempView 不是 apache.org.spark.sql.DataFrame 的成员