[Py]Spark SQL:使用框架的输入行约束窗口的每一帧

Posted

技术标签:

【中文标题】[Py]Spark SQL:使用框架的输入行约束窗口的每一帧【英文标题】:[Py]Spark SQL: Constrain each frame of a Window using the frame's input row 【发布时间】:2017-09-22 22:21:30 【问题描述】:

我想根据当前输入行来限制聚合函数使用窗口框架中的哪些行。例如,给定一个 DataFrame df 和一个 Window w,我希望能够执行以下操作:

df2 = df.withColumn("foo", first(col("bar").filter(...)).over(w))

.filter 将根据框架的输入行从当前窗口框架中删除行。

我的具体用例如下:给定一个DataFramedf

+-----+--+--+
|group|n1|n2| 
+-----+--+--+
|    1| 1| 6|
|    1| 0| 3|
|    1| 2| 2|
|    1| 3| 5|
|    2| 0| 5|
|    2| 0| 7|
|    2| 3| 2|
|    2| 5| 9|
+-----+--+--+

窗口

w = Window.partitionBy("group")\
          .orderBy("n1", "n2")\
          .rowsBetween(Window.currentRow + 1, Window.unboundedFollowing)

还有一些正长i,你如何在r的每个输入行中找到第一行(fr)使得r.n1fr.n1,r.n2fr.n2 , 和 max(fr.n1 - r.n1, fr.n2 - r.n2) i?返回的值可以是fr.n1frdf 中的行索引。因此,对于 i = 6,示例 df 的输出将是

+-----+--+--+-----+
|group|n1|n2|fr.n1|
+-----+--+--+-----+
|    1| 1| 6| null|
|    1| 0| 3|    1|
|    1| 2| 2|    3|
|    1| 3| 5| null|
|    2| 0| 5|    5|
|    2| 0| 7|    5|
|    2| 3| 2| null|
|    2| 5| 9| null|
+-----+--+--+-----+

我一直在研究 Spark API 并查看 Window、first 和 when 的示例,但我似乎无法将它们拼凑在一起。这甚至可以通过 Window 和聚合函数实现,还是我完全不合时宜?

【问题讨论】:

你能否通过一些例子来解释一下你真正想做的事情现在给出的问题我不清楚。 如果| 2| 3| 2| 5| 是真的那么| 1| 2| 2| 3| 是真的吗?反之亦然。他们互相矛盾。请查看您的最终 df。 糟糕!我更新了原始示例解决方案以使其正确。我还在开头的一般描述中添加了一个示例,希望能让事情更清楚。 【参考方案1】:

您将无法仅使用窗口函数和聚合来做到这一点,您需要一个自联接: 加入:

df = sc.parallelize([[1, 1, 6],[1, 0, 3],[1, 2, 2],[1, 3, 5],[2, 0, 5],[2, 0, 7],[2, 3, 2],[2, 5, 9]]).toDF(["group","n1","n2"])

import pyspark.sql.functions as psf
df_r = df.select([df[c].alias("r_" + c) for c in df.columns])
df_join = df_r\
    .join(df, (df_r.r_group == df.group) 
        & (df_r.r_n1 < df.n1) 
        & (df_r.r_n2 < df.n2) 
        & (psf.greatest(df.n1 - df_r.r_n1, df.n2 - df_r.r_n2) < i), "leftouter")\
    .drop("group")

现在我们可以应用窗口函数只保留第一行:

w = Window.partitionBy("r_group", "r_n1", "r_n2").orderBy("n1", "n2")
res = df_join\
    .withColumn("rn", psf.row_number().over(w))\
    .filter("rn = 1").drop("rn")

    +-------+----+----+----+----+
    |r_group|r_n1|r_n2|  n1|  n2|
    +-------+----+----+----+----+
    |      1|   0|   3|   1|   6|
    |      1|   1|   6|null|null|
    |      1|   2|   2|   3|   5|
    |      1|   3|   5|null|null|
    |      2|   0|   5|   5|   9|
    |      2|   0|   7|   5|   9|
    |      2|   3|   2|null|null|
    |      2|   5|   9|null|null|
    +-------+----+----+----+----+

【讨论】:

感谢您的回答。它需要一个连接是有道理的,因为这实际上是过滤窗口框架会做的事情,至少在我的具体情况下。

以上是关于[Py]Spark SQL:使用框架的输入行约束窗口的每一帧的主要内容,如果未能解决你的问题,请参考以下文章

python spark-sql-application.py

python spark-sql-ranking.py

python spark-sql-aggregations.py

python spark-sql-operations.py

python spark-sql-sorting.py

python spark-sql-aggregations.py