关于 pyspark windows 函数中的 ntile 函数

Posted

技术标签:

【中文标题】关于 pyspark windows 函数中的 ntile 函数【英文标题】:regarding the ntile function in pyspark windows function 【发布时间】:2021-04-02 03:29:48 【问题描述】:

我正在运行以下代码段,

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)windowSpec  = 
Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

结果如下图,我很困惑ntile(2)是如何工作的,为什么第一行和第二行得到的nite列值为1,以及为什么第三行和第四行得到12 分别,即使他们的工资值相同。我只是不知道这个 ntile 列是如何计算的?

【问题讨论】:

【参考方案1】:

ntile 窗口函数用于将结果集分解为指定数量的近似相等的组或桶。 ntile 函数返回与每一行关联的存储桶编号。 ntile 名称来源于将结果集划分为四分位(quartile)、十分位(decile)等的做法。

如果行数不能被桶整除,则 ntile 函数会生成两个大小相差 1 的组。 较大的组总是按照 ORDER BY 子句中指定的顺序排在较小的组之前。

您提供的示例将每个部门的员工分为两组:

Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()
首先partitionBy将员工按部门名称划分为partition。 然后,orderBy 将每个分区中的员工按薪水排序。 最后,ntile(2) 函数为每个分区中的每一行分配了一个桶号。每当部门发生变化时,它都会重置存储桶编号。

现在我介绍了一位额外的员工MariaZ,这将有助于更好地理解。

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("MariaZ", "Sales", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

输出:

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|       MariaZ|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    2|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+

【讨论】:

【参考方案2】:

ntile() 由 SQL 定义以返回大小尽可能相等的图块。这会产生您所看到的结果 - 可以(任意)在不同的行之间拆分平局。

您可以使用直接计算来代替ntile()。这是一种方法:

ceil(rank() over (partition by department order by salary) * 2.0 /
     count(*) over (partition by department)
    )

请注意,图块的大小不一定相同 - 某些图块可能会完全丢失。但是,领带都放在同一个图块中。

【讨论】:

以上是关于关于 pyspark windows 函数中的 ntile 函数的主要内容,如果未能解决你的问题,请参考以下文章

对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集

如何在 PySpark 中使用窗口函数?

Pyspark SQL/SQL 中的窗口和聚合函数

Pyspark:从随机项目函数创建一个集合

如何使用 PySpark 的 Window 函数来模拟指数衰减?

PySpark 使用函数创建多索引配对 RDD