Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能

Posted

技术标签:

【中文标题】Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能【英文标题】:Apache Scala/Python Spark 2.4.4: Group data by year range to generate/analyze new feature 【发布时间】:2019-11-18 17:35:45 【问题描述】:

我已经关注了我为特征工程生成的数据框,现在为了驱动另一个特征,我正在尝试创建 purchaseYearRange 列,我想在其中创建一个具有 3 年范围的列并聚合 modelRatio, purchaseRatioitemNo, modelnumber and valueClass columns.

例如:对于 itemNo#7010032,我们将在新的 purchaseYearRange 列和这些年份的 modelRatio, purchaseRatio 值中有一个值为 1995-1996-1997 的行将在相应的行中汇总。接下来,我将在接下来的 3 年中这样做,即 1996-1997-1998、1997-1998-1999 等。

此外,该项目有一行带有itemClass - RGR,对于该行,我们将只有该行带有 1996 年的数据。

基本上,检查数据框中的 3 年数据,如果存在,则对三年范围窗口求和 modelRatio, purchaseRatio。如果不存在三年期数据,则根据数据可用性合计两年或一年。

数据帧

| itemNo|modelnumber|itemClass |purchaseYear|          ModelRatio|      PurchaseRatio|
+-------+-----------+----------+------------+--------------------+-------------------+
|1321457|  A99AA25CA|       ATV|        1995|   1.801325096248545|                2.0|
|7010032|  A99AA25CB|       ATV|        1995|  1.0048348106365834|                2.0|
|7010032|  A99AA25CB|       ATV|        1996|  0.8899632912525741|                2.0|
|7010032|  A99AA25CB|       RGR|        1996|  0.7899632912525741|                1.0| 
|7010032|  A99AA25CB|       ATV|        1997|   1.669710806697108|                2.0|  
|7010032|  A99AA25CB|       ATV|        1998|  0.9982988629241651|                2.0| 
|7010032|  A99AA25CB|       ATV|        1999|0.006535947712418301|                1.0|
|7552901|  A99AA25CD|       ATV|        1995|   37.83901871250784|               12.0| 
|7552901|  A99AA25CD|       ATV|        1996|0.026143790849673203|                1.0|
|7552901|  A99AA25CD|       ATV|        1997|  0.9375951293759512|                2.0|

我是 scala spark 的新手,并使用 .createOrReplaceTempView("test") and then apply SQL operations 进行了尝试,但这种方式非常复杂。你能分享一下我是如何完成它的吗? 请随时使用 Python 或 Scala 提出解决方案

【问题讨论】:

【参考方案1】:

IIUC,您可以使用 Spark SQL Window function 如下:(确保 purchaseYear 是数字或时间戳列)

编辑: 每个 cmets,添加 all_puchase_years 以包括 3 年序列。请注意,ORDER BY itemNo, purchaseYear 子句仅用于演示目的。

spark.sql(""" 
       SELECT itemNo 
       ,      modelnumber 
       ,      itemClass 
       ,      concat_ws('-', sort_array(collect_set(purchaseYear) OVER w1)) AS purchase_years 
       ,      concat_ws('-', sequence(purchaseYear, purchaseYear+2)) AS all_purchase_years
       ,      sum(PurchaseRatio) OVER w1 AS sum_PurchaseRatio 
       ,      sum(ModelRatio) OVER w1 AS sum_ModelRatio 
       FROM test 
       ORDER BY itemNo, purchaseYear 
       WINDOW w1 AS (
           PARTITION BY (itemNo, modelnumber, itemClass)  
           ORDER BY purchaseYear 
           RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING
       ) 
""").show() 
#+-------+-----------+---------+--------------+-----------------+--------------------+
#| itemNo|modelnumber|itemClass|purchase_years|sum_PurchaseRatio|      sum_ModelRatio|
#+-------+-----------+---------+--------------+-----------------+--------------------+
#|1321457|  A99AA25CA|      ATV|          1995|              2.0|   1.801325096248545|
#|7010032|  A99AA25CB|      ATV|1995-1996-1997|              6.0|   3.564508908586266|
#|7010032|  A99AA25CB|      RGR|          1996|              1.0|  0.7899632912525741|
#|7010032|  A99AA25CB|      ATV|1996-1997-1998|              6.0|  3.5579729608738475|
#|7010032|  A99AA25CB|      ATV|1997-1998-1999|              5.0|  2.6745456173336914|
#|7010032|  A99AA25CB|      ATV|     1998-1999|              3.0|  1.0048348106365834|
#|7010032|  A99AA25CB|      ATV|          1999|              1.0|0.006535947712418301|
#|7552901|  A99AA25CD|      ATV|1995-1996-1997|             15.0|   38.80275763273346|
#|7552901|  A99AA25CD|      ATV|     1996-1997|              3.0|  0.9637389202256245|
#|7552901|  A99AA25CD|      ATV|          1997|              2.0|  0.9375951293759512|
#+-------+-----------+---------+--------------+-----------------+--------------------+

【讨论】:

刚刚实现的查询是使用current and 2 following。因此,如果有 2005 年、2007 年和 2008 年的数据,但没有 2006 年的数据,则它会执行 2005-2007-2008 年的数据并对其求和。为避免这种情况,您能否建议我们如何插入0 for missing entries,然后应用上述逻辑,以便始终考虑连续三年并将其总结。例如。在倒数第二行,通过复制 itemNo、modelnumber 和 itemClass 并在 modelRatio 和 purchaseRatio 列中输入 0 来插入缺少 1995 的记录,然后应用上述逻辑。 @SachinSharma,我们使用的是RANGE,其间是偏移量而不是行号,因此它应该会在接下来的 2 年中回升,我相信 2008 年应该被排除在 2005-2007 年之外。 感谢您的及时回复。我正在考虑让purchase_year 有 3 年的时间,以便它在标签上看起来一致。即使数据是两年或一年,purchase_year 始终采用YYYY-YYYY-YYYY 格式,并表示在总结时考虑了哪些年份,因此业务方知道它始终是三年。我正在解决这个问题,但如果它是简单的修复,那么让我知道修复。

以上是关于Apache Scala/Python Spark 2.4.4:按年份分组数据以生成/分析新功能的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 如何支持不同语言的 API

通过 C# 的 Apache Spark 查询 [关闭]

大数据百科 | 什么是 Apache Spark?

Scala PartialFunction ***

scala python哪个用来开发spark更好

本地Spark连接远程集群Hive(Scala/Python)