根据scala中的条件对列进行火花数据框聚合

Posted

技术标签:

【中文标题】根据scala中的条件对列进行火花数据框聚合【英文标题】:spark dataframe aggregation of column based on condition in scala 【发布时间】:2020-02-03 09:44:15 【问题描述】:

我有以下格式的 csv 数据。

我需要找到 2017 年营业额超过 100 的前 2 供应商。

Turnover= Sum(Invoices which status is Paid-in-Full ) - Sum(Invoices 其状态为 Exception 或 Rejected)

我已经在 datebricks scala notebook 中加载了来自 csv 的数据,如下所示:

val invoices_data = spark.read.format(file_type)
                  .option("header", "true")
                  .option("dateFormat", "M/d/yy")
                  .option("inferSchema", "true")
                 .load("invoice.csv")

然后我尝试按供应商名称进行分组

val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")

但现在我不知道该怎么做。

这是示例 csv 数据。

Id     InvoiceDate      Status         Invoice   VendorName
    2   2/23/17         Exception       23        V1
    3   11/23/17        Paid-in-Full    56        V1
    1   12/20/17        Paid-in-Full    12        V1
    5   8/4/19          Paid-in-Full    123       V2
    6   2/6/17          Paid-in-Full    237       V2
    9   3/9/17          Rejected        234       V2
    7   4/23/17         Paid-in-Full    78        V3
    8   5/23/17         Exception       345       V4

【问题讨论】:

【参考方案1】:

您可以使用 udf 签署发票,具体取决于状态和使用 sum 函数对聚合 df 进行分组后:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
def signInvoice: (String, Int) => Int = (status: String, invoice: Int) => 
  status match 
    case "Exception" | "Rejected" => -invoice
    case "Paid-in-Full" => invoice
    case _ => throw new IllegalStateException("wrong status")
  


val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
val top2_vendorsDF = invoices_data
  .withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
  .filter(year(col("InvoiceDate")) === lit(2017))
  .withColumn("Invoice", col("Invoice").as[Int])
  .groupBy("VendorName")
  .agg(sum(signInvoiceUdf('Status, 'Invoice)).as("sum_invoice"))
  .filter(col("sum_invoice") > 100)
  .orderBy(col("sum_invoice").desc)
  .take(2)

【讨论】:

【参考方案2】:

我已经使用pivot方法解决了上述问题。

invoices_data
              .filter(invoices_data("InvoiceStatusDesc") === "Paid-in-Full" || 
                invoices_data("InvoiceStatusDesc") === "Exception" ||
                invoices_data("InvoiceStatusDesc") === "Rejected")
              .filter(year(to_date(invoices_data("InvoiceDate"), "M/d/yy")) === 2017)
              .groupBy("InvoiceVendorName").pivot("InvoiceStatusDesc").sum("InvoiceTotal")

【讨论】:

以上是关于根据scala中的条件对列进行火花数据框聚合的主要内容,如果未能解决你的问题,请参考以下文章

如何聚合数据框并通过 r 中的重复行对列的值求和

如何在火花聚合函数中实现scala类型安全

熊猫数据框根据名称对列进行分组并应用函数

在火花数据框中聚合期间过滤数组值

根据火花数据框scala中的列值过滤行

Spark:在scala中的数据帧上使用动态过滤器进行聚合