Scala - 对 RDD 上的 Groupby 和 Max

Posted

技术标签:

【中文标题】Scala - 对 RDD 上的 Groupby 和 Max【英文标题】:Scala - Groupby and Max on pair RDD 【发布时间】:2017-05-31 06:59:37 【问题描述】:

我是 spark scala 的新手,想找到每个部门的最高薪水

Dept,Salary
Dept1,1000
Dept2,2000
Dept1,2500
Dept2,1500
Dept1,1700
Dept2,2800

我实现了下面的代码

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf


object MaxSalary 
  val sc = new SparkContext(new SparkConf().setAppName("Max Salary").setMaster("local[2]"))

  case class Dept(dept_name : String, Salary : Int)

  val data = sc.textFile("file:///home/user/Documents/dept.txt").map(_.split(","))
  val recs = data.map(r => (r(0), Dept(r(0), r(1).toInt)))
  val a = recs.max()???????
)

但卡住了如何实现 group by 和 max 功能。我正在使用对 RDD。

谢谢

【问题讨论】:

【参考方案1】:

这可以使用带有以下代码的 RDD 来完成:

val emp = sc.textFile("file:///home/user/Documents/dept.txt")
            .mapPartitionsWithIndex( (idx, row) => if(idx==0) row.drop(1) else row )
            .map(x => (x.split(",")(0).toString, x.split(",")(1).toInt))

val maxSal = emp.reduceByKey(math.max(_,_))

应该给你:

Array[(String, Int)] = Array((Dept1,2500), (Dept2,2800))

【讨论】:

【参考方案2】:

如果你在这里使用数据集是解决方案

case class Dept(dept_name : String, Salary : Int)


val sc = new SparkContext(new SparkConf().setAppName("Max Salary").setMaster("local[2]"))

  val sq = new SQLContext(sc)

  import sq.implicits._
  val file = "resources/ip.csv"

  val data = sc.textFile(file).map(_.split(","))

  val recs = data.map(r => Dept(r(0), r(1).toInt )).toDS()


  recs.groupBy($"dept_name").agg(max("Salary").alias("max_solution")).show()

输出:

+---------+------------+
|dept_name|max_solution|
+---------+------------+
|    Dept2|        2800|
|    Dept1|        2500|
+---------+------------+

【讨论】:

收到错误value toDS is not a member of org.apache.spark.rdd.RDD[MaxSalary.Dept] 你用过 import spark.implicits._ No.. 我需要在 sbt 中写任何东西,因为它返回错误not found: value sqlContext 使用 val sq = new SQLContext(sc) import sq.implicits._ OP 正在使用rdd,如他的问题所述。

以上是关于Scala - 对 RDD 上的 Groupby 和 Max的主要内容,如果未能解决你的问题,请参考以下文章

scala中分组的算子的用法

从 Scala 上的 Spark RDD 对象构建 RDD LabeledPoint

Spark/Scala:仅使用 RDD 使用 ReduceByKey 创建嵌套结构

RDD[Array[Int]] 上的 Scala Reduce() 操作

浅谈Spark RDD

如何使用scala对spark中rdd的每一行进行排序?