如何在 PySpark 中使用窗口函数?

Posted

技术标签:

【中文标题】如何在 PySpark 中使用窗口函数?【英文标题】:How to use window functions in PySpark? 【发布时间】:2015-08-06 14:02:54 【问题描述】:

我正在尝试将一些 Windows 函数(ntilepercentRank)用于数据框,但我不知道如何使用它们。

谁能帮我解决这个问题?在Python API documentation 中没有关于它的示例。

具体来说,我正在尝试在我的数据框中获取数字字段的分位数。

我使用的是 spark 1.4.0。

【问题讨论】:

【参考方案1】:

为了能够使用窗口功能,您必须先创建一个窗口。定义与普通 SQL 几乎相同,这意味着您可以定义顺序、分区或两者。首先让我们创建一些虚拟数据:

import numpy as np
np.random.seed(1)

keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])

df = sqlContext.createDataFrame([
   "k": k, "v": round(float(v), 3) for k, v in zip(keys, values)])

确保您使用的是HiveContext(仅限 Spark

from pyspark.sql import HiveContext

assert isinstance(sqlContext, HiveContext)

创建一个窗口:

from pyspark.sql.window import Window

w =  Window.partitionBy(df.k).orderBy(df.v)

相当于

(PARTITION BY k ORDER BY v) 

在 SQL 中。

根据经验,窗口定义应始终包含PARTITION BY 子句,否则 Spark 会将所有数据移动到单个分区。 ORDER BY 是某些函数所必需的,而在不同的情况下(通常是聚合)可能是可选的。

还有两个可选的可用于定义窗口跨度 - ROWS BETWEENRANGE BETWEEN。在这种特殊情况下,这些对我们没有用处。

最后我们可以用它来查询:

from pyspark.sql.functions import percentRank, ntile

df.select(
    "k", "v",
    percentRank().over(w).alias("percent_rank"),
    ntile(3).over(w).alias("ntile3")
)

请注意,ntile 与分位数没有任何关系。

【讨论】:

以上是关于如何在 PySpark 中使用窗口函数?的主要内容,如果未能解决你的问题,请参考以下文章

带有窗口函数的 PySpark 数据偏度

如何创建与列相关的大小的 Pyspark 窗口函数

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数