Pyspark 向数据框添加顺序和确定性索引
Posted
技术标签:
【中文标题】Pyspark 向数据框添加顺序和确定性索引【英文标题】:Pyspark add sequential and deterministic index to dataframe 【发布时间】:2018-09-13 16:28:15 【问题描述】:我需要使用三个非常简单的约束向数据框添加索引列:
从0开始
顺序
确定性
我确定我遗漏了一些明显的东西,因为我发现的示例对于如此简单的任务看起来非常复杂,或者使用非顺序、非确定性越来越单调的 id。我不想使用索引进行压缩,然后必须将以前分隔的列分开,这些列现在位于一列中,因为我的数据帧以 TB 为单位,这似乎没有必要。我不需要按任何东西进行分区,也不需要按任何东西排序,我找到的示例就是这样做的(使用窗口函数和 row_number)。我只需要一个简单的 0 到 df.count 整数序列。我在这里错过了什么?
1、2、3、4、5
【问题讨论】:
DataFrame 本质上是无序的。这是它们用于并行处理的核心原因之一——任何执行器都可以获取数据的任何部分并完成其工作。你可以引入一个订单(如你所展示的),但如果你不按任何东西排序,它怎么能是确定性的呢? 顺便说一句,我相信monotonically_increasing_id
将是确定性的,只要您不更改分区数。
很公平,也许我在这里断章取义地使用了索引这个词。我的意思是:如何添加一个有序的、单调递增 1 个序列 0:df.count 的列?
【参考方案1】:
我的意思是:如何添加一个有序、单调递增 1 个序列 0:df.count 的列? (from comments)
您可以在此处使用row_number()
,但为此您需要指定orderBy()
。由于您没有排序列,因此只需使用monotonically_increasing_id()
。
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
df = df.withColumn(
"index",
row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)
另外,row_number()
从 1 开始,因此您必须减去 1 才能使其从 0 开始。最后一个值将是 df.count - 1
。
我不想用索引压缩,然后必须将以前分离的列分开,现在在一个列中
您可以使用zipWithIndex
,如果您随后调用map
,以避免所有分隔的列变成一个列:
cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols
【讨论】:
【参考方案2】:不确定性能,但这是一个技巧。
注意 - toPandas 会将所有数据收集到驱动程序
from pyspark.sql import SparkSession
# speed up toPandas using arrow
spark = SparkSession.builder.appName('seq-no') \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.execution.arrow.enabled", "true") \
.getOrCreate()
df = spark.createDataFrame([
('id1', "a"),
('id2', "b"),
('id2', "c"),
], ["ID", "Text"])
df1 = spark.createDataFrame(df.toPandas().reset_index()).withColumnRenamed("index","seq_no")
df1.show()
+------+---+----+
|seq_no| ID|Text|
+------+---+----+
| 0|id1| a|
| 1|id2| b|
| 2|id2| c|
+------+---+----+
【讨论】:
df.toPandas()
你在开玩笑吗,如果 to_pandas 是可能的,那么一开始就没有使用 spark!以上是关于Pyspark 向数据框添加顺序和确定性索引的主要内容,如果未能解决你的问题,请参考以下文章