如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?

Posted

技术标签:

【中文标题】如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?【英文标题】:How to solve "aggregateByKey is not a member of org.apache.spark.sql.Dataset" in Spark? 【发布时间】:2019-04-02 13:27:16 【问题描述】:

我正在尝试这个例子:

https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/

但我使用的是数据框,而不是 RDD。

我尝试了以下方法:

val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
                       .aggregateByKey(zeroVal)(seqOp, combOp) 

这是这段代码 sn-p 的一部分:



val student_df = sc.parallelize(Array(
    ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), 
    ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), 
    ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), 
    ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), 
    ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), 
    ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), 
    ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3).toDF("student", "subject", "marks")


def seqOp = (accumulator: Int, element: (String, Int)) => 
    if(accumulator > element._2) accumulator else element._2

def combOp = (accumulator1: Int, accumulator2: Int) => 
    if(accumulator1 > accumulator2) accumulator1 else accumulator2


val zeroVal = 0

val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
                       .aggregateByKey(zeroVal)(seqOp, combOp) 


这给出了这个错误:

error: value aggregateByKey is not a member of org.apache.spark.sql.Dataset[(String, (String, Int))]

一个可能的原因是value aggregateByKey之前缺少一个分号?

我在这里做错了什么?我如何使用数据框或数据集?

【问题讨论】:

【参考方案1】:

尝试在 student_df 之后和 map 之前调用 rdd:

val aggrRDD = student_df.rdd.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
          .aggregateByKey(zeroVal)(seqOp, combOp)

【讨论】:

非常感谢,超级好用……我对此感到困惑……***.com/questions/45620797/… 如何处理这种情况...***.com/questions/55473276/… 我可以看到里面第一个链接的答案,第二个我提供了我的解决方案。 你能看看这个***.com/questions/55037648/… 我可以为你推荐这篇文章 - medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

以上是关于如何解决 Spark 中的“aggregateByKey 不是 org.apache.spark.sql.Dataset 的成员”?的主要内容,如果未能解决你的问题,请参考以下文章

我们应该如何解决 Spark 的 sbt 文件中的本地依赖关系

调优之道——如何解决Spark的数据倾斜问题

如何修改 Spark 数据框中的 numpy 数组?

如何解决 com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换... Java Spark

Spark“ExecutorLostFailure” - 如何解决?

Spark程序的Map函数中的空指针异常