带有过滤器的pyspark窗口函数

Posted

技术标签:

【中文标题】带有过滤器的pyspark窗口函数【英文标题】:pyspark window function with filter 【发布时间】:2018-03-28 15:52:57 【问题描述】:

我有以下带有列的 DataFrame:["id", "timestamp", "x", "y"]:

+---+----------+---+---+
| id| timestamp|  x|  y|
+---+----------+---+---+
|  0|1443489380|100|  1|
|  0|1443489390|200|  0|
|  0|1443489400|300|  0|
|  1|1443489410|400|  1|
|  1|1443489550|100|  1|
|  2|1443489560|600|  0|
|  2|1443489570|200|  0|
|  2|1443489580|700|  1|
+---+----------+---+---+

我已经定义了以下Window

from pyspark.sql import Window
w = Window.partitionBy("id").orderBy("timestamp")

我想只提取窗口w 中的第一行和最后一行数据。我怎样才能做到这一点?

【问题讨论】:

您需要pyspark.sql.functions.first()pyspark.sql.functions.last()df.select(first('id').over(w).alias('id'), ....) 【参考方案1】:

如果您希望第一个和最后一个值位于同一行,一种方法是使用pyspark.sql.functions.first()

from pyspark.sql import Window
from pyspark.sql.functions import first

w1 = Window.partitionBy("id").orderBy("timestamp")
w2 = Window.partitionBy("id").orderBy(f.col("timestamp").desc())  # sort desc

df.select(
        "id",
        *([first(c).over(w1).alias("first_" + c) for c in df.columns if c != "id"] +
          [first(c).over(w2).alias("last_" + c) for c in df.columns if c != "id"])
    )\
    .distinct()\
    .show()
#+---+---------------+-------+-------+--------------+------+------+
#| id|first_timestamp|first_x|first_y|last_timestamp|last_x|last_y|
#+---+---------------+-------+-------+--------------+------+------+
#|  0|     1443489380|    100|      1|    1443489400|   300|     0|
#|  1|     1443489410|    400|      1|    1443489550|   100|     1|
#|  2|     1443489560|    600|      0|    1443489580|   700|     1|
#+---+---------------+-------+-------+--------------+------+------+

【讨论】:

以上是关于带有过滤器的pyspark窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 使用过滤器应用 DataFrame 窗口函数

带有窗口函数的 PySpark 数据偏度

Django ORM:带有后续过滤的窗口函数

PySpark - 窗口函数导致新列

如何创建与列相关的大小的 Pyspark 窗口函数

pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?