将 DATASet Api 与 Spark Scala 结合使用

Posted

技术标签:

【中文标题】将 DATASet Api 与 Spark Scala 结合使用【英文标题】:Using DATASet Api with Spark Scala 【发布时间】:2020-12-30 00:46:32 【问题描述】:

您好,我是 spark/Scala 的新手,正在尝试实现一些功能。我的要求非常简单。我必须使用 DataSet API 执行所有操作。

问题 1: 我将 csv 转换为案例类?将数据框转换为 DataSet 的正确方法吗?我做对了吗?

此外,当我尝试对 orderItemFile1 进行转换时,我可以使用 _.order_id 进行过滤/映射操作。但 groupBy 不会发生同样的情况

 case class  orderItemDetails (order_id_order_item:Int, item_desc:String,qty:Int,   sale_value:Int)
    val orderItemFile1=ss.read.format("csv")
        .option("header",true)
        .option("infersSchema",true)
        .load("src/main/resources/Order_ItemData.csv").as[orderItemDetails]

         orderItemFile1.filter(_.order_id_order_item>100) //Works Fine
         orderItemFile1.map(_.order_id_order_item.toInt)// Works Fine

//Error .Inside group By I am unable to  access it as _.order_id_order_item. Why So?
   orderItemFile1.groupBy(_.order_id_order_item) 

//Below Works..But How this will provide compile time safely as committed 
//by DataSet Api.I can pass any wrong column name also here and it will be //caught only on run time

orderItemFile1.groupBy(orderItemFile1("order_id_order_item")).agg(sum(orderItemFile1("item_desc")))

【问题讨论】:

【参考方案1】:

也许您正在寻找的功能是#groupByKey。参见示例here。

至于您的第一个问题,基本上是的,您正在将 CSV 读取到 Dataset[A] 中,其中 A 是您声明的案例类。

【讨论】:

以上是关于将 DATASet Api 与 Spark Scala 结合使用的主要内容,如果未能解决你的问题,请参考以下文章

RDD, DataFrame,DataSet区别与相互转化

Apache Spark 2.0三种API的传说:RDDDataFrame和Dataset

如何使用 Spark Dataset API (Java) 创建数组列

APACHE SPARK 2.0 API IMPROVEMENTS: RDD, DATAFRAME, DATASET AND SQL

如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?

Spark基础学习笔记23:DataFrame与Dataset