Pyspark 向数据框添加顺序和确定性索引

Posted

技术标签:

【中文标题】Pyspark 向数据框添加顺序和确定性索引【英文标题】:Pyspark add sequential and deterministic index to dataframe 【发布时间】:2018-09-13 16:28:15 【问题描述】:

我需要使用三个非常简单的约束向数据框添加索引列:

从0开始

顺序

确定性

我确定我遗漏了一些明显的东西,因为我发现的示例对于如此简单的任务看起来非常复杂,或者使用非顺序、非确定性越来越单调的 id。我不想使用索引进行压缩,然后必须将以前分隔的列分开,这些列现在位于一列中,因为我的数据帧以 TB 为单位,这似乎没有必要。我不需要按任何东西进行分区,也不需要按任何东西排序,我找到的示例就是这样做的(使用窗口函数和 row_number)。我只需要一个简单的 0 到 df.count 整数序列。我在这里错过了什么?

1、2、3、4、5

【问题讨论】:

DataFrame 本质上是无序的。这是它们用于并行处理的核心原因之一——任何执行器都可以获取数据的任何部分并完成其工作。你可以引入一个订单(如你所展示的),但如果你不按任何东西排序,它怎么能是确定性的呢? 顺便说一句,我相信monotonically_increasing_id 将是确定性的,只要您不更改分区数。 很公平,也许我在这里断章取义地使用了索引这个词。我的意思是:如何添加一个有序的、单调递增 1 个序列 0:df.count 的列? 【参考方案1】:

我的意思是:如何添加一个有序、单调递增 1 个序列 0:df.count 的列? (from comments)

您可以在此处使用row_number(),但为此您需要指定orderBy()。由于您没有排序列,因此只需使用monotonically_increasing_id()

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)

另外,row_number() 从 1 开始,因此您必须减去 1 才能使其从 0 开始。最后一个值将是 df.count - 1


我不想用索引压缩,然后必须将以前分离的列分开,现在在一个列中

可以使用zipWithIndex,如果您随后调用map,以避免所有分隔的列变成一个列:

cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols

【讨论】:

【参考方案2】:

不确定性能,但这是一个技巧。

注意 - toPandas 会将所有数据收集到驱动程序

from pyspark.sql import SparkSession

# speed up toPandas using arrow
spark = SparkSession.builder.appName('seq-no') \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()

df = spark.createDataFrame([
    ('id1', "a"),
    ('id2', "b"),
    ('id2', "c"),
], ["ID", "Text"])

df1 = spark.createDataFrame(df.toPandas().reset_index()).withColumnRenamed("index","seq_no")

df1.show()

+------+---+----+
|seq_no| ID|Text|
+------+---+----+
|     0|id1|   a|
|     1|id2|   b|
|     2|id2|   c|
+------+---+----+

【讨论】:

df.toPandas() 你在开玩笑吗,如果 to_pandas 是可能的,那么一开始就没有使用 spark!

以上是关于Pyspark 向数据框添加顺序和确定性索引的主要内容,如果未能解决你的问题,请参考以下文章

向数据框添加索引。 Pyspark 2.4.4 [重复]

向 pyspark 中的数据框添加列

向数据框添加列并在 pyspark 中更新

如何在 pyspark aws emr 中向现有数据框添加多列?

如何创建 Pyspark UDF 以向数据框添加新列

我只需要在 pyspark 数据框中附加那些具有非空值的人