在 Spark Dataframe 中实现 Window 的重叠分区
Posted
技术标签:
【中文标题】在 Spark Dataframe 中实现 Window 的重叠分区【英文标题】:Achieve overlapping partitions for Window in Spark Dataframe 【发布时间】:2019-07-31 07:49:23 【问题描述】:我的情况如下:
我有一个由符号(分类)值的时间序列组成的数据框。它看起来类似于:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
我现在的目标是制作一个滑动窗口并将 n 个前导值收集到一个数组中。
我通过以下方式实现了这一目标:
sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))
这导致以下数据框:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A]
4 A 0 [A]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
到目前为止一切顺利。由于 Spark 中的分区性质,滑动数组在到达分区末尾时会变得更短,因为缺少另一个分区中存在的前导行的信息。对于无法避免的时间序列的结尾,但希望滑动窗口不会错过中间的任何信息(本例中的索引 3 和 4)。
所需的 Dataframe 如下所示:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A, C]
4 A 0 [A, C, B]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
最好是有重叠的分区,这样索引 5 和 6 在两个分区中都存在冗余,我可以计算所需的滑动窗口。有什么方法可以实现吗?
使用重叠数据,原始 Dataframe 将如下所示:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 0
6 B 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
所以基本上分区 1 的前两行将被复制并附加为分区 0 的最后一行。
我考虑过过滤分区边界信息并在本地计算必要的信息,然后再加入原始数据帧,但我希望有一个更简单的方法。
【问题讨论】:
你能代表你想要的数据框吗,措辞没有太多想法。 我添加了我想要的输出。 【参考方案1】:在您的示例中,如果您只是不对窗口进行分区,它将为您提供所需的内容
sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))
给予
idx symbol block sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A, C]
4 A 0 [A, C, B]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
另外,请注意,collect_list()
不遵守顺序(由于 spark 的分布式特性),因此您的符号会在列表中混淆。
【讨论】:
问题是我明确想要执行滑动窗口分布(这就是我创建“块”列的原因,以在窗口聚合期间保持分区)。不定义分区会将所有数据移动到不可扩展的单个分区。 您可以使用此技巧复制一行:***.com/questions/50624745/… 要确定您需要复制哪些行,您可以使用 max("idx").over(Window.partitionBy(" block")) 并复制那些 idx == max 或 idx == max-1 我也想过这个问题,但只是复制行(5 和 6)并不能将它们带到正确的分区。它们将在第 1 块中,但我需要它们在第 0 块中来计算滑动窗口。 我假设您事先在代码中使用了 repartition() ?如果在这段代码之后你不能移动它,我认为它是 rip。 在此之前没有重新分区调用。块列是使用 spark_partition_id 函数预先创建的。它旨在在使用窗口函数时保留现有的分区。我有一个想法以上是关于在 Spark Dataframe 中实现 Window 的重叠分区的主要内容,如果未能解决你的问题,请参考以下文章