根据scala中的条件对列进行火花数据框聚合
Posted
技术标签:
【中文标题】根据scala中的条件对列进行火花数据框聚合【英文标题】:spark dataframe aggregation of column based on condition in scala 【发布时间】:2020-02-03 09:44:15 【问题描述】:我有以下格式的 csv 数据。
我需要找到 2017 年营业额超过 100 的前 2 供应商。
Turnover= Sum(Invoices which status is Paid-in-Full ) - Sum(Invoices 其状态为 Exception 或 Rejected)
我已经在 datebricks scala notebook 中加载了来自 csv 的数据,如下所示:
val invoices_data = spark.read.format(file_type)
.option("header", "true")
.option("dateFormat", "M/d/yy")
.option("inferSchema", "true")
.load("invoice.csv")
然后我尝试按供应商名称进行分组
val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")
但现在我不知道该怎么做。
这是示例 csv 数据。
Id InvoiceDate Status Invoice VendorName
2 2/23/17 Exception 23 V1
3 11/23/17 Paid-in-Full 56 V1
1 12/20/17 Paid-in-Full 12 V1
5 8/4/19 Paid-in-Full 123 V2
6 2/6/17 Paid-in-Full 237 V2
9 3/9/17 Rejected 234 V2
7 4/23/17 Paid-in-Full 78 V3
8 5/23/17 Exception 345 V4
【问题讨论】:
【参考方案1】:您可以使用 udf 签署发票,具体取决于状态和使用 sum 函数对聚合 df 进行分组后:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
def signInvoice: (String, Int) => Int = (status: String, invoice: Int) =>
status match
case "Exception" | "Rejected" => -invoice
case "Paid-in-Full" => invoice
case _ => throw new IllegalStateException("wrong status")
val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
val top2_vendorsDF = invoices_data
.withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
.filter(year(col("InvoiceDate")) === lit(2017))
.withColumn("Invoice", col("Invoice").as[Int])
.groupBy("VendorName")
.agg(sum(signInvoiceUdf('Status, 'Invoice)).as("sum_invoice"))
.filter(col("sum_invoice") > 100)
.orderBy(col("sum_invoice").desc)
.take(2)
【讨论】:
【参考方案2】:我已经使用pivot方法解决了上述问题。
invoices_data
.filter(invoices_data("InvoiceStatusDesc") === "Paid-in-Full" ||
invoices_data("InvoiceStatusDesc") === "Exception" ||
invoices_data("InvoiceStatusDesc") === "Rejected")
.filter(year(to_date(invoices_data("InvoiceDate"), "M/d/yy")) === 2017)
.groupBy("InvoiceVendorName").pivot("InvoiceStatusDesc").sum("InvoiceTotal")
【讨论】:
以上是关于根据scala中的条件对列进行火花数据框聚合的主要内容,如果未能解决你的问题,请参考以下文章