pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?

Posted

技术标签:

【中文标题】pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?【英文标题】:Are window functions(e.g. first, last, lag, lead) supported by pyspark?pyspark 是否支持窗口函数(例如 first、last、lag、lead)? 【发布时间】:2015-03-23 21:26:13 【问题描述】:

pyspark 是否支持窗口函数(例如first, last, lag, lead)?

例如,如何按一列分组并按另一列排序,然后选择 每个组的第一行(就像窗口函数一样 做什么)通过 SparkSQL 或数据框?

我发现pyspark.sql.functions 类包含聚合函数firstlast,但它们不能用于groupBy 类。

【问题讨论】:

我不认为它们是直接支持的,但是你可以自己实现它们; groupByKey 为您提供组中所有“行”(对象)的数组(嗯,一个可迭代的) Nexr has window functions 实现为应该在 Spark SQL 中工作的 Hive UDF(用户定义的函数)。您需要使用 Hive 构建 Spark,更改一些配置并注册 UDF。 【参考方案1】:

以上所有函数都可以与窗口函数一起使用。样本看起来有点像这样。

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, first, last

df.withColumn('value', lag('col1name').over(
    Window.partitionBy('colname2').orderBy('colname3')
    )
)

仅当您使用 partitionBy 子句时才在分区上使用该函数。如果您只想滞后/领先于整个数据,请使用简单的 orderBy 并且 不要使用 patitionBy强>子句。但是,这不会很有效。

如果您希望滞后/领先以相反的方式执行,您还可以使用以下格式:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, first, last, desc

df.withColumn('value', lag('col1name').over(
    Window.partitionBy('colname2').orderBy(desc('colname3'))
    )
)

虽然严格来说,您不需要 desc 用于滞后/领先类型的函数。它们主要与 rank / percent_rank / row_number 类型的函数结合使用。

【讨论】:

【参考方案2】:

从 spark 1.4 开始,您可以使用窗口函数。在 pyspark 中,这看起来像这样:

from pyspark.sql.functions import rank
from pyspark.sql import Window
data = sqlContext.read.parquet("/some/data/set")
data_with_rank = data.withColumn("rank", rank().over(Window.partitionBy("col1").orderBy(data["col2"].desc())))
data_with_rank.filter(data_with_rank["rank"] == 1).show()

【讨论】:

请注意,如果您想在窗口操作中使用df.sql 功能,您需要使用HiveContext,而不是SqlContext

以上是关于pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 窗口函数导致新列

带有窗口函数的 PySpark 数据偏度

PySpark 中的窗口函数和条件过滤器

如何创建与列相关的大小的 Pyspark 窗口函数

pyspark 使用过滤器应用 DataFrame 窗口函数

带有过滤器的pyspark窗口函数