spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups

Posted nefu-ljw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups相关的知识,希望对你有一定的参考价值。

groupBy 对比 groupByKey

groupBy: groupBy类似于传统SQL语言中的group by子语句,但比较不同的是groupBy()可以带多个列名,对多个列进行group。比如想根据 “id” 和 “name” 进行 groupBy 的话可以

df.groupBy("id","name")

groupBy返回的类型是RelationalGroupedDataset。

groupByKey: groupByKey则更加灵活,可以根据用户自己对列的组合来进行groupBy,比如上面的那个例子,根据 “id” 和 “name” 进行 groupBy,使用groupByKey可以这样。

//同前面的goupBy效果是一样的,但返回的类型是不一样的
df.toDF("id","name").groupByKey(row =>
    row.getString(0) + row.getString(1)
)

但和groupBy不同的是groupByKey返回的类型是KeyValueGroupedDataset。

groupByKey + mapGroups / groupBy + agg

https://stackoverflow.com/questions/49291397/spark-mapgroups-on-a-dataset

df.groupByKey(t => (t.A, t.B)) // 以(属性A,属性B)分组
.mapGroups...=>...
.toDF()

可以等价于

df.groupBy($"A", $"B") // 以(属性A,属性B)分组
.agg...=>...
.toDF()

(1)groupByKey + mapGroups

val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
  .groupByKey(_._1).mapGroupscase(k,iter) => 
  val listBuffer1 = new ListBuffer[Double]
  val listBuffer2 = new ListBuffer[Int]
      for(a <- iter)
        listBuffer1 += a._2._1
        listBuffer2 += a._2._2
      
      (k._1, k._2, listBuffer1.sum, listBuffer2.sum)
    
  .toDF("KeyHour","KeyCategory","TotalComm","TotalValue")
  .orderBy($"KeyHour".asc)

(2)groupBy + agg

val resultSum = df.groupBy($"Hour", $"Category")
  .agg(sum($"TotalComm").as("TotalComm"), sum($"TotalValue").as("TotalValue"))
  .orderBy(asc("Hour"))

以上是关于spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL中的DataFrame的创建

java的怎么操作spark的dataframe

Spark-SQL之DataFrame操作大全

值 createGlobalTempView 不是 apache.org.spark.sql.DataFrame 的成员

DataFrame编程模型初谈与Spark SQL

DataFrame DataSet Spark SQL学习