Scala Spark groupBy/Agg 函数

Posted

技术标签:

【中文标题】Scala Spark groupBy/Agg 函数【英文标题】:Scala Spark groupBy/Agg functions 【发布时间】:2020-06-26 13:39:31 【问题描述】:

我有两个数据集需要加入并对其执行操作,但我不知道该怎么做。 对此的规定是我没有可用的 org.apache.spark.sql.functions 方法,因此必须使用数据集 API

给定的输入是两个数据集 第一个数据集是带有字段的客户类型: customerId, forename, surname - 所有字符串

第二个数据集是Transaction: customerId(String)、accountId(String)、金额(Long)

customerId 是链接

输出的Dataset需要有这些字段: customerId(String), forename(String), surname(String), transactions(Transaction类型的列表), transactionCount(int), totalTransactionAmount(Double),averageTransactionAmount(Double)

我知道我需要在最后使用 groupBy、agg 和某种 join。 任何人都可以帮助/指出我正确的方向吗?谢谢

【问题讨论】:

当您提供的信息如此之少时,很难提供帮助,您能否添加一些详细信息,例如输入、预期输出。数据框的架构? 为什么没有 org.apache.spark.sql.functions._ 可用? @werner 练习中规定不能使用,必须使用Dataset API @koiralo 抱歉,我添加了字段的输入/输出类型,给定的输入是两个数据集,一个是 DataSet[Customer],另一个是 Dataset[Transaction] 【参考方案1】:

使用您拥有的信息非常困难,但据我了解,您不想使用数据框功能而是使用数据集 api 实现所有内容,您可以通过以下方式执行此操作

    使用 joinWith 连接两个数据集,您可以在此处找到示例 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins.html#joinWith

    聚合:我会使用 groupByKey 后跟 mapGroups 类似

 ds.groupByKey(x=>x.id).mapGroups  case (key,iter) =>  
        val list = iter.toList
        val totalTransactionAmount = ???
        val averageTransactionAmount = ??? 
        (key,totalTransactionAmount,averageTransactionAmount)
    
 

希望该示例能够让您了解如何使用数据集 API 解决问题,并使其适应您的问题。

【讨论】:

谢谢,这个 groupBy 和 mapGroups 帮助我得到了答案

以上是关于Scala Spark groupBy/Agg 函数的主要内容,如果未能解决你的问题,请参考以下文章

spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups

spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups

Apache Spark Dataframe Groupby agg() 用于多列

python pandas, DF.groupby().agg(), agg() 中的列引用

Pandas groupby agg - 如何获得计数?

求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1