向数据框添加索引。 Pyspark 2.4.4 [重复]

Posted

技术标签:

【中文标题】向数据框添加索引。 Pyspark 2.4.4 [重复]【英文标题】:Add an index to a dataframe. Pyspark 2.4.4 [duplicate] 【发布时间】:2021-02-04 00:31:01 【问题描述】:

有很多例子都给出了相同的基本例子。

dfWithIndex = df.withColumn('f_index', \ 
  pyspark.sql.functions.lit(1).cast(pyspark.sql.types.LongType()))
rdd = df.rdd.zipWithIndex().map(lambda row, rowId: (list(row) + [rowId + 1]))
dfIndexed = sqlContext.createDataFrame(rdd, schema=dfWithIndex.schema)

使用这些 lambdas 真的很新,但是 printScema-ing rdd 与一个普通的 zipEithIndex() 给了我一个两列数据框.. _1 (struct) 和一个 _2 long 用于索引本身。这就是 lambda 似乎引用的内容。但是我收到了这个错误:

TypeError: <lambda>() missing 1 required positional argument: 'rowId'

【问题讨论】:

在执行普通的 rdd.show() 时,我会按预期在 _2 中递增整数,这很好。我只是想让数据框恢复到正常的列状态... 【参考方案1】:

你已经接近了。您只需要稍微修改 lambda 函数。它应该接受 1 个参数,类似于 (Row, id),并返回单个 Row 对象。

from pyspark.sql import Row
from pyspark.sql.types import StructField, LongType

df = spark.createDataFrame([['a'],['b'],['c']],['val'])
df2 = df.rdd.zipWithIndex().map(
    lambda r: Row(*r[0], r[1])
).toDF(df.schema.add(StructField('id', LongType(), False)))

df2.show()
+---+---+
|val| id|
+---+---+
|  a|  0|
|  b|  1|
|  c|  2|
+---+---+

【讨论】:

以上是关于向数据框添加索引。 Pyspark 2.4.4 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 向数据框添加顺序和确定性索引

向 pyspark 中的数据框添加列

向数据框添加列并在 pyspark 中更新

如何在 pyspark aws emr 中向现有数据框添加多列?

如何创建 Pyspark UDF 以向数据框添加新列

向 pyspark Dataframe 添加新行