PySpark - 从 UDF 获取行索引

Posted

技术标签:

【中文标题】PySpark - 从 UDF 获取行索引【英文标题】:PySpark - Get index of row from UDF 【发布时间】:2017-12-21 08:36:45 【问题描述】:

我有一个数据框,我需要获取特定行的行号/索引。我想添加一个新行,使其包含字母以及行号/索引,例如。 "A - 1","B - 2"

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])

有输出

+------+---------+
|Letter|distances|
+------+---------+
|     A|       20|
|     B|       30|
|     D|       80|
+------+---------+

我希望新的输出是这样的,

+------+---------------+
|Letter|distances|index|
+------+---------------+
|     A|       20|A - 1|
|     B|       30|B - 2|
|     D|       80|D - 3|
+------+---------------+

这是我一直在研究的功能

def cate(letter):
    return letter + " - " + #index
a.withColumn("index", cate(a["Letter"])).show()

【问题讨论】:

也许this 可以帮忙? (创建 ID,然后将行 ID 作为参数传递给您的函数 cate 【参考方案1】:

既然您想(仅)使用 UDF 来实现结果,让我们试试这个

from pyspark.sql.functions import udf, monotonically_increasing_id
from pyspark.sql.types import StringType

#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])

def cate(letter, idx):
    return letter + " - " + str(idx)
cate_udf = udf(cate, StringType())
a = a.withColumn("temp_index", monotonically_increasing_id())
a = a.\
    withColumn("index", cate_udf(a.Letter, a.temp_index)).\
    drop("temp_index")
a.show()

输出是:

+------+---------+--------------+
|Letter|distances|         index|
+------+---------+--------------+
|     A|       20|         A - 0|
|     B|       30|B - 8589934592|
|     D|       80|D - 8589934593|
+------+---------+--------------+

【讨论】:

monotonically_increasing_id() 不会给出从给定值开始的序列,而是给出一个随机序列。 @Bala - 是的,是的......但它正在增加,所以对于 OP 的用例,我认为它可以用作顺序索引。【参考方案2】:

这应该可以工作

df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
df.createOrReplaceTempView("df")

spark.sql("select concat(Letter,' - ',row_number() over (order by Letter)) as num, * from df").show()

+-----+------+---------+                                                        
|  num|Letter|distances|
+-----+------+---------+
|A - 1|     A|       20|
|B - 2|     B|       30|
|D - 3|     D|       80|
+-----+------+---------+

【讨论】:

这是一个很好的解决方案,但是我受限于使用 UDF 的 pyspark。

以上是关于PySpark - 从 UDF 获取行索引的主要内容,如果未能解决你的问题,请参考以下文章

如何将行传递到pyspark udf

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

PySpark 分组并逐行应用 UDF 操作

PySpark - 获取重复行的索引

正在分析的 pyspark udf 打印行

pyspark:删除所有行中具有相同值的列