Spark Scala:多列的移动平均值

Posted

技术标签:

【中文标题】Spark Scala:多列的移动平均值【英文标题】:Spark Scala: moving average for multiple columns 【发布时间】:2017-06-27 14:43:17 【问题描述】:

输入:

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
                                ("Alice", "2016-05-03", 45.00,2),
                                ("Alice", "2016-05-04", 55.00,4),
                                ("Bob", "2016-05-01", 25.00,6),
                                ("Bob", "2016-05-04", 29.00,7),
                                ("Bob", "2016-05-06", 27.00,10))).
                           toDF("name", "date", "amountSpent","NumItems")

程序:

 // Import the window functions.
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._

 // Create a window spec.
 val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

在此窗口规范中,数据按客户分区。每个客户的数据都按日期排序。并且,窗框定义为从-1开始(当前行前一排)到1(当前行后一排)结束,滑动窗口共3行。 问题是对列列表进行基于窗口的求和。在这种情况下,它们是“amountSpent”、“NumItems”。但问题可能有多达数百列。

以下是对每列进行基于窗口求和的解决方案。但是,如何更有效地进行求和?因为我们不需要每次都为每一列查找滑动窗口行。

 // Calculate the sum of spent
 customers.withColumn("sumSpent",sum(customers("amountSpent")).over(wSpec1)).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumSpent|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|    95.0|
  |Alice|2016-05-03|       45.0|       2|   150.0|
  |Alice|2016-05-04|       55.0|       4|   100.0|
  |  Bob|2016-05-01|       25.0|       6|    54.0|
  |  Bob|2016-05-04|       29.0|       7|    81.0|
  |  Bob|2016-05-06|       27.0|      10|    56.0|
  +-----+----------+-----------+--------+--------+

 // Calculate the sum of items
 customers.withColumn( "sumItems",
                sum(customers("NumItems")).over(wSpec1)  ).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumItems|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|       6|
  |Alice|2016-05-03|       45.0|       2|      10|
  |Alice|2016-05-04|       55.0|       4|       6|
  |  Bob|2016-05-01|       25.0|       6|      13|
  |  Bob|2016-05-04|       29.0|       7|      23|
  |  Bob|2016-05-06|       27.0|      10|      17|
  +-----+----------+-----------+--------+--------+

【问题讨论】:

每个移动总和需要单独的列吗? 所有汇总列都可以在单独的数据框中。 @HappyCoding 如果回答对您有帮助,请点赞/接受 【参考方案1】:

目前,我想,使用 Window 函数更新多列是不可能的。你可以表现得好像它同时发生在下面

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
  ("Alice", "2016-05-03", 45.00,2),
  ("Alice", "2016-05-04", 55.00,4),
  ("Bob", "2016-05-01", 25.00,6),
  ("Bob", "2016-05-04", 29.00,7),
  ("Bob", "2016-05-06", 27.00,10))).
  toDF("name", "date", "amountSpent","NumItems")

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
var tempdf = customers
val colNames = List("amountSpent", "NumItems")
for(column <- colNames)
  tempdf = tempdf.withColumn(column+"Sum", sum(tempdf(column)).over(wSpec1))

tempdf.show(false)

你的输出应该是

+-----+----------+-----------+--------+--------------+-----------+
|name |date      |amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Bob  |2016-05-01|25.0       |6       |54.0          |13         |
|Bob  |2016-05-04|29.0       |7       |81.0          |23         |
|Bob  |2016-05-06|27.0       |10      |56.0          |17         |
|Alice|2016-05-01|50.0       |4       |95.0          |6          |
|Alice|2016-05-03|45.0       |2       |150.0         |10         |
|Alice|2016-05-04|55.0       |4       |100.0         |6          |
+-----+----------+-----------+--------+--------------+-----------+

【讨论】:

【参考方案2】:

是的,可以只计算一次窗口(如果您有 Spark 2,它允许您将 collect_list 与 struct-types 一起使用),假设您的代码中具有数据框和 windowSpec,那么:

val colNames = List("amountSpent","NumItems")
val cols= colNames.map(col(_))

// put window-content of all columns in one struct
val df_wc_arr = customers
.withColumn("window_content_arr",collect_list(struct(cols:_*)).over(wSpec1))

// calculate sum of window-content for each column
// aggregation exression used later
val aggExpr = colNames.map(n => sum(col("window_content."+n)).as(n+"Sum"))

df_wc_arr
.withColumn("window_content",explode($"window_content_arr"))
.drop($"window_content_arr")
.groupBy(($"name" :: $"date" :: cols):_*)
.agg(aggExpr.head,aggExpr.tail:_*)
.orderBy($"name",$"date")
.show

给予

+-----+----------+-----------+--------+--------------+-----------+
| name|      date|amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Alice|2016-05-01|       50.0|       4|          95.0|          6|
|Alice|2016-05-03|       45.0|       2|         150.0|         10|
|Alice|2016-05-04|       55.0|       4|         100.0|          6|
|  Bob|2016-05-01|       25.0|       6|          54.0|         13|
|  Bob|2016-05-04|       29.0|       7|          81.0|         23|
|  Bob|2016-05-06|       27.0|      10|          56.0|         17|
+-----+----------+-----------+--------+--------------+-----------+

【讨论】:

以上是关于Spark Scala:多列的移动平均值的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark:指数移动平均线

如何使用 Spark Scala 计算 Bin 平均值?

Spark求平均成绩–Scala

如何根据每周日期创建移动平均线,按data.table中的多列分组?

在 Spark 中使用 Dataframe 获取平均值

大数据入门之Spark快速入门及导入数据,求平均值