使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号

Posted

技术标签:

【中文标题】使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号【英文标题】:Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2 【发布时间】:2019-04-02 04:53:31 【问题描述】:

我有一个 PySpark DataFrame -

valuesCol = [('Sweden',31),('Norway',62),('Iceland',13),('Finland',24),('Denmark',52)]
df = sqlContext.createDataFrame(valuesCol,['name','id'])
+-------+---+
|   name| id|
+-------+---+
| Sweden| 31|
| Norway| 62|
|Iceland| 13|
|Finland| 24|
|Denmark| 52|
+-------+---+

我想在这个DataFrame中添加一个row column,也就是该行的行号(序列号),如下图-

我的最终输出应该是:

+-------+---+--------+
|   name| id|row_num |
+-------+---+--------+
| Sweden| 31|       1|
| Norway| 62|       2|
|Iceland| 13|       3|
|Finland| 24|       4|
|Denmark| 52|       5|
+-------+---+--------+

我的 Spark 版本是2.2

我正在尝试这段代码,但它不起作用 -

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().orderBy()
df = df.withColumn("row_num", row_number().over(w))
df.show()

我收到一个错误:

AnalysisException: 'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'

如果我理解正确,我需要订购一些列,但我不想要像 w = Window().orderBy('id') 这样的东西,因为这会重新排列整个 DataFrame。

谁能建议如何使用row_number() 函数实现上述输出?

【问题讨论】:

@cph_sto- 你可能也喜欢这个。***.com/questions/41313488/… 【参考方案1】:

您应该为 order 子句定义列。如果您不需要订购值,请编写一个虚拟值。下面试试;

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))

【讨论】:

谢谢先生。它完美地工作。只是一个小问题 - 我错过了'lit('A')'。你能解释一下这部分代码在做什么吗?这里的“A”是什么,因为它无论如何都不会出现在最终输出中。无论如何我都会接受它作为答案,因为这会产生预期的输出。 这是一个虚拟值。这意味着你可以写任何东西而不是 A 明白了,谢谢 :) 最后一个问题 - 我已经看到 row_number() 与 partitionBy() 一起使用了很多次,所以如果我从 HDFS 加载数据并添加一列行号,和上面一样,分区会重新洗牌吗?我知道 Spark 只会在调用操作时触发执行,并且 Catalyst 将重新安排操作以产生最佳解决方案。我的问题:我认为在我们从 HDFS 加载数据之后(以及在我们调用任何操作之前)使用 row_numbers() 不会对数据进行重新分区,但只是想征求您的意见! 我认为它会起作用。如果您不需要对数据进行分组并获取每个组的行号,则无需使用 partitionBy 子句。 完美解决方案..;

以上是关于使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Submit 中将 s3a 与 Apache spark 2.2(hadoop 2.8) 一起使用?

如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?

Spark 2.2 空安全左外连接空指针异常

带有 Jersey 客户端版本 2.2 的 Restful WebService 调用

选择带有stratio lucene索引的cassandra的排序查询,版本> 2.2

Cloudera manager的服务安装以及spark升级到2.2