在 Spark Dataframe 中实现 Window 的重叠分区

Posted

技术标签:

【中文标题】在 Spark Dataframe 中实现 Window 的重叠分区【英文标题】:Achieve overlapping partitions for Window in Spark Dataframe 【发布时间】:2019-07-31 07:49:23 【问题描述】:

我的情况如下: 我有一个由符号(分类)值的时间序列组成的数据框。它看起来类似于: idx symbol partition 0 A 0 1 C 0 2 B 0 3 C 0 4 A 0 5 C 1 6 B 1 7 D 1 8 C 1 9 B 1

我现在的目标是制作一个滑动窗口并将 n 个前导值收集到一个数组中。

我通过以下方式实现了这一目标:

sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

这导致以下数据框:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0               [C, A]
    4      A         0                  [A]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

到目前为止一切顺利。由于 Spark 中的分区性质,滑动数组在到达分区末尾时会变得更短,因为缺少另一个分区中存在的前导行的信息。对于无法避免的时间序列的结尾,但希望滑动窗口不会错过中间的任何信息(本例中的索引 3 和 4)。

所需的 Dataframe 如下所示:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0            [C, A, C]
    4      A         0            [A, C, B]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

最好是有重叠的分区,这样索引 5 和 6 在两个分区中都存在冗余,我可以计算所需的滑动窗口。有什么方法可以实现吗?

使用重叠数据,原始 Dataframe 将如下所示:

    idx    symbol    partition    
    0      A         0        
    1      C         0        
    2      B         0        
    3      C         0        
    4      A         0
    5      C         0
    6      B         0
    5      C         1        
    6      B         1        
    7      D         1        
    8      C         1           
    9      B         1              

所以基本上分区 1 的前两行将被复制并附加为分区 0 的最后一行。

我考虑过过滤分区边界信息并在本地计算必要的信息,然后再加入原始数据帧,但我希望有一个更简单的方法。

【问题讨论】:

你能代表你想要的数据框吗,措辞没有太多想法。 我添加了我想要的输出。 【参考方案1】:

在您的示例中,如果您只是不对窗口进行分区,它将为您提供所需的内容

sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

给予

 idx    symbol    block    sliding
    0      A         0        [A, C, B]
    1      C         0        [C, B, C]
    2      B         0        [B, C, A]
    3      C         0        [C, A, C]
    4      A         0        [A, C, B]
    5      C         1        [C, B, D]
    6      B         1        [B, D, C]
    7      D         1        [D, C, B]
    8      C         1           [C, B]
    9      B         1              [B]

另外,请注意,collect_list() 不遵守顺序(由于 spark 的分布式特性),因此您的符号会在列表中混淆。

【讨论】:

问题是我明确想要执行滑动窗口分布(这就是我创建“块”列的原因,以在窗口聚合期间保持分区)。不定义分区会将所有数据移动到不可扩展的单个分区。 您可以使用此技巧复制一行:***.com/questions/50624745/… 要确定您需要复制哪些行,您可以使用 max("idx").over(Window.partitionBy(" block")) 并复制那些 idx == max 或 idx == max-1 我也想过这个问题,但只是复制行(5 和 6)并不能将它们带到正确的分区。它们将在第 1 块中,但我需要它们在第 0 块中来计算滑动窗口。 我假设您事先在代码中使用了 repartition() ?如果在这段代码之后你不能移动它,我认为它是 rip。 在此之前没有重新分区调用。块列是使用 spark_partition_id 函数预先创建的。它旨在在使用窗口函数时保留现有的分区。我有一个想法

以上是关于在 Spark Dataframe 中实现 Window 的重叠分区的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 中实现 informatica 逻辑

如何在 Apache Spark 中实现递归算法?

如何在 Spark 中实现“交叉连接”?

在 Spark GraphX 中实现拓扑排序

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

使用 ForeachWriter 在 Spark 流中实现 Cassandra 接收器