将 DATASet Api 与 Spark Scala 结合使用
Posted
技术标签:
【中文标题】将 DATASet Api 与 Spark Scala 结合使用【英文标题】:Using DATASet Api with Spark Scala 【发布时间】:2020-12-30 00:46:32 【问题描述】:您好,我是 spark/Scala 的新手,正在尝试实现一些功能。我的要求非常简单。我必须使用 DataSet API 执行所有操作。
问题 1: 我将 csv 转换为案例类?将数据框转换为 DataSet 的正确方法吗?我做对了吗?
此外,当我尝试对 orderItemFile1 进行转换时,我可以使用 _.order_id 进行过滤/映射操作。但 groupBy 不会发生同样的情况
case class orderItemDetails (order_id_order_item:Int, item_desc:String,qty:Int, sale_value:Int)
val orderItemFile1=ss.read.format("csv")
.option("header",true)
.option("infersSchema",true)
.load("src/main/resources/Order_ItemData.csv").as[orderItemDetails]
orderItemFile1.filter(_.order_id_order_item>100) //Works Fine
orderItemFile1.map(_.order_id_order_item.toInt)// Works Fine
//Error .Inside group By I am unable to access it as _.order_id_order_item. Why So?
orderItemFile1.groupBy(_.order_id_order_item)
//Below Works..But How this will provide compile time safely as committed
//by DataSet Api.I can pass any wrong column name also here and it will be //caught only on run time
orderItemFile1.groupBy(orderItemFile1("order_id_order_item")).agg(sum(orderItemFile1("item_desc")))
【问题讨论】:
【参考方案1】:也许您正在寻找的功能是#groupByKey
。参见示例here。
至于您的第一个问题,基本上是的,您正在将 CSV 读取到 Dataset[A]
中,其中 A
是您声明的案例类。
【讨论】:
以上是关于将 DATASet Api 与 Spark Scala 结合使用的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark 2.0三种API的传说:RDDDataFrame和Dataset
如何使用 Spark Dataset API (Java) 创建数组列
APACHE SPARK 2.0 API IMPROVEMENTS: RDD, DATAFRAME, DATASET AND SQL
如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?