Spark - aggregateByKey 类型不匹配错误

Posted

技术标签:

【中文标题】Spark - aggregateByKey 类型不匹配错误【英文标题】:Spark - aggregateByKey Type mismatch error 【发布时间】:2020-04-13 04:25:10 【问题描述】:

我正在尝试找出这背后的问题。我正在尝试使用aggregateByKey 查找每个学生的最大分数。

val data = spark.sc.Seq(("R1","M",22),("R1","E",25),("R1","F",29),
                        ("R2","M",20),("R2","E",32),("R2","F",52))
                   .toDF("Name","Subject","Marks")
def seqOp = (acc:Int,ele:(String,Int)) => if (acc>ele._2) acc else ele._2
def combOp =(acc:Int,acc1:Int) => if(acc>acc1) acc else acc1

val r = data.rdd.mapcase(t1,t2,t3)=> (t1,(t2,t3)).aggregateByKey(0)(seqOp,combOp)

我收到 aggregateByKey 接受 (Int,(Any,Any)) 但实际是 (Int,(String,Int)) 的错误。

【问题讨论】:

我通过rdd.map case (name, _, marks) => (name, marks) .groupByKey().map(x => (x._1, x._2.max))解决了它。结果:List((R2,52), (R1,29))。我找不到使用aggregateByKey的方法 【参考方案1】:

您的地图函数不正确,因为您输入的是Row,而不是Tuple3

用 : 修正最后一行

val r = data.rdd.map  r =>
      val t1 = r.getAs[String](0)
      val t2 = r.getAs[String](1)
      val t3 = r.getAs[Int](2)
      (t1,(t2,t3))
    .aggregateByKey(0)(seqOp,combOp)

【讨论】:

以上是关于Spark - aggregateByKey 类型不匹配错误的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD aggregateByKey

Spark算子之aggregateByKey详解

Spark aggregateByKey函数

spark Dataframe 中的 reducebykey 和 aggregatebykey

Spark操作:Aggregate和AggregateByKey

如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?