PySpark - 获取组中每一行的行号

Posted

技术标签:

【中文标题】PySpark - 获取组中每一行的行号【英文标题】:PySpark - get row number for each row in a group 【发布时间】:2017-08-04 19:12:06 【问题描述】:

使用 pyspark,我希望能够对 spark 数据框进行分组,对组进行排序,然后提供行号。所以

Group    Date
  A      2000
  A      2002
  A      2007
  B      1999
  B      2015

会变成

Group    Date    row_num
  A      2000      0
  A      2002      1
  A      2007      2
  B      1999      0
  B      2015      1

【问题讨论】:

为什么这么讨厌? 不幸的是,问题必须包含您自己测试过的代码(并且不起作用)的错误印象,尽管根据 SO 询问指南,这是当然不是的情况:***.com/help/on-topic 【参考方案1】:

使用窗口函数:

from pyspark.sql.window import *
from pyspark.sql.functions import row_number

df.withColumn("row_num", row_number().over(Window.partitionBy("Group").orderBy("Date")))

【讨论】:

不错!我在 withColumn... 中插入了一个缺少的逗号 :) 欢迎来到 SO 并恭​​喜您回答了您的第一个问题!继续前进,不要失望(有时这可能是一个严酷的地方......) - 另请查看我的编辑以了解如何使用代码突出显示 @desertnaut 我们可以保留数据帧的自然顺序而不是orderby 进行排序吗?【参考方案2】:

公认的解决方案几乎是正确的。这是基于问题中要求的输出的解决方案:

df = spark.createDataFrame([("A", 2000), ("A", 2002), ("A", 2007), ("B", 1999), ("B", 2015)], ["Group", "Date"])

+-----+----+
|Group|Date|
+-----+----+
|    A|2000|
|    A|2002|
|    A|2007|
|    B|1999|
|    B|2015|
+-----+----+

# accepted solution above


from pyspark.sql.window import *
from pyspark.sql.functions import row_number

df.withColumn("row_num", row_number().over(Window.partitionBy("Group").orderBy("Date")))


# accepted solution above output


+-----+----+-------+
|Group|Date|row_num|
+-----+----+-------+
|    B|1999|      1|
|    B|2015|      2|
|    A|2000|      1|
|    A|2002|      2|
|    A|2007|      3|
+-----+----+-------+

如您所见,函数 row_number 从 1 而不是 0 开始,并且请求的问题希望 row_num 从 0 开始。简单的更改如下:

df.withColumn("row_num", row_number().over(Window.partitionBy("Group").orderBy("Date"))-1).show()

输出:

+-----+----+-------+
|Group|Date|row_num|
+-----+----+-------+
|    B|1999|      0|
|    B|2015|      1|
|    A|2000|      0|
|    A|2002|      1|
|    A|2007|      2|
+-----+----+-------+

然后您可以按您想要的任何顺序对“组”列进行排序。上面的解决方案几乎有它,但重要的是要记住 row_number 以 1 而不是 0 开头。

【讨论】:

以上是关于PySpark - 获取组中每一行的行号的主要内容,如果未能解决你的问题,请参考以下文章

获取组中每一天的最后记录

如何获取 HIVE/PySpark 表中每一列的唯一值?

使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号

如何获取给定记录组中每一天的最后一条记录?

MySQL获取组中每x分钟的列的平均值和总和

IO流应用:在文本文件前增加行号