Apache Spark - 处理临时 RDD 上的滑动窗口

Posted

技术标签:

【中文标题】Apache Spark - 处理临时 RDD 上的滑动窗口【英文标题】:Apache Spark - Dealing with Sliding Windows on Temporal RDDs 【发布时间】:2014-12-17 12:07:21 【问题描述】:

在过去的几个月里,我一直在使用 Apache Spark,但现在我收到了一项相当艰巨的任务,即在配对的 RDD 上计算滑动窗口上的平均值/最小值/最大值等,其中关键组件是一个日期标签,值组件是一个矩阵。因此,每个聚合函数还应返回一个矩阵,其中每个单元格的平均值为该时间段内所有该单元格的平均值。

我希望能够说我想要每 7 天的平均值,滑动窗口为 1 天。滑动窗口的移动单位总是1,然后是窗口大小的单位(所以如果是每12周,窗口移动单位是1)。

我现在最初的想法是简单地迭代,如果我们想要每 X 天 X 次的平均值,并且每次只需按日期对元素进行分组,并带有偏移量。

所以如果我们有这种情况:

天数:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

矩阵:A B C D E F G H I J K L M N O

我们想要每 5 天的平均值,我将迭代 5 次并在此处显示分组:

第一次迭代:

第 1 组:(1, A) (2, B) (3, C) (4, D) (5, E)

第 2 组:(6, F) (7, G) (8, H) (9, I) (10, J)

第 3 组:(11, K) (12, L) (13, M) (14, N) (15, O)

第二次迭代:

第 1 组:(2, B) (3, C) (4, D) (5, E) (6, F)

第 2 组:(7, G) (8, H) (9, I) (10, J), (11, K)

第 3 组:(12, L) (13, M) (14, N) (15, O)

等等,对于每个组,我必须做一个折叠/减少程序来获得平均值。

但是,正如您想象的那样,这非常慢,并且可能是一种相当糟糕的方法。不过,我真的想不出更好的方法。

【问题讨论】:

我假设在第一次迭代中,您同时处理每个组?当我第一次开始使用 Spark 时,我有一个坏习惯,即创建一个代表第 1 组(迭代 1)的新 RDD,并使用 map/reduce 来查找 min/max/average,而不是对整个原始 RDD 进行操作。从本质上讲,这耗尽了 Spark 的所有力量。我不在乎承认我花了多长时间才意识到自己在做什么。 这是我在这个主题上见过的最好的尝试:***.com/questions/23402303/… @TravisJ 我按顺序迭代,然后按键对它们进行分组,然后同时减少。那是对的吧?如果您认为查看代码会有所帮助,我可以发布它吗? @maasg 感谢您的链接! 我不知道我在阅读 scala 时会非常有效。听起来不错。如果您将 (1, 2, 3, 4, 5) 映射到 key=Group 1,(6, 7, 8, 9, 10) 映射到 key=Group 2,则应该是正确的,然后按 key 减少。 (map 和 reduce 并行应用。)我的坏习惯是做类似 newRDD = filter().reduce() 的事情,其中​​过滤选择了第 1 组中的元素,然后 reduce 找到了 min/max/ avg,然后再次过滤(串行)下一组。本质上我只是使用并行性来计算平均值,找到最小值/最大值。真的很糟糕。 【参考方案1】:

如果您转换为 DataFrame,这一切都会变得简单得多——您只需将数据自行连接回自身并找到平均值。假设我有一系列这样的数据:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

汇总如下:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

然后我需要创建一个 UDF 来移动连接条件的日期(注意我只使用 offset = -2 来使用 2 天的窗口):

def dateShift(myDate: java.sql.Date): java.sql.Date = 
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)

val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

然后我可以很容易地找到这样的 2 天滚动平均值:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

虽然这并不是您想要做的,但您会看到如何使用 DataFrame 自连接从数据集中提取运行平均值。希望对您有所帮助。

【讨论】:

这似乎不能很好地推广到更大的窗口大小,因为窗口大小例如7 意味着 6 个连接,对吧?在 MLib 中使用 Sliding 可能会更好:issues.apache.org/jira/browse/SPARK-1241 实际上不,该窗口可在 UDF 中配置——offset -2 将其设置为过去两天开始的窗口。您还可以使 UDF 采用 Int 即窗口,甚至可能进行连接,以便根据某些动态条件使窗口有所不同。话虽如此,我从未使用过 MLib——从没想过我对机器学习感兴趣——所以不知道 Sliding,但会看看它——谢谢! 我明白了,我并没有掌握连接的实际工作原理。谢谢。 我会稍微整理一下我的答案,希望能让我在做什么更清楚一点。

以上是关于Apache Spark - 处理临时 RDD 上的滑动窗口的主要内容,如果未能解决你的问题,请参考以下文章

Spark核心编程---创建RDD

使用 Apache Spark 将 RDD 写入文本文件

在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合

对同一个 apache Spark RDD 的操作会导致所有语句重新执行

toDF() 不处理 RDD

spark版本定制十八:Spark Streaming中空RDD处理及流处理程序优雅的停止