如何在 spark 数据集上使用 group by
Posted
技术标签:
【中文标题】如何在 spark 数据集上使用 group by【英文标题】:how to use group by on spark dataset 【发布时间】:2016-10-15 02:11:59 【问题描述】:我正在使用 Spark 数据集(Spark 1.6.1 版本)。 下面是我的代码
object App
val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._
override def readDataTable(tableName:String):DataFrame=
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)
var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")
var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")
var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")
现在我想在多个列上执行 group by 子句?
怎么做?
result.groupBy(_._1._1.created_at)
这样可以吗?
如果是,那么我无法将结果视为一个组,如何在多列上执行此操作?
【问题讨论】:
【参考方案1】:如果我正确理解了您的要求,您最好的选择是在PairRDDFunctions 类中使用reduceByKey
函数。
函数的签名是def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
,它只是意味着你使用了一系列的键/值对。
让我解释一下工作流程:
-
您检索要使用的集合(在您的代码中:
result
)
使用 RDD map
函数,您可以将结果集拆分为一个元组,该元组包含两个子元组,其中包含组成键的字段和要聚合的字段(例如:result.map(row => ((row.key1, row.key2), (row.value1, row.value2))
)
现在您有了一个 RDD[(K,V)],其中类型 K 是键字段元组的类型,V 是值字段元组的类型
您可以直接使用reduceByKey
,方法是传递一个(V,V) => V
类型的函数来聚合值(例如:(agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2)
)
请注意:
您必须从聚合函数返回相同的值类型 您必须导入org.apache.spark.SparkContext._
才能自动使用 PairRDDFunctions 实用函数
groupBy
也是如此,您必须从起始 RDD 映射到一对 RDD[K,V]
,但您没有聚合函数,因为您只是将值存储在 seq 中以供进一步计算李>
如果您需要聚合的起始值(例如:0 表示计数),请改用 foldByKey
函数
【讨论】:
以上是关于如何在 spark 数据集上使用 group by的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache Spark 中的 Group By Operation 形成的每个子集上应用用户定义函数?
Spark CTAS 上的 Hive 使用 Straight SELECT 失败,但使用 SELECT GROUP BY 成功