Spark向数据框添加索引并附加其他没有索引的数据集
Posted
技术标签:
【中文标题】Spark向数据框添加索引并附加其他没有索引的数据集【英文标题】:Spark adding indexes to dataframe and append other dataset that doesn't have index 【发布时间】:2020-07-16 23:39:09 【问题描述】:我有一个包含列用户 ID 和索引值的数据集。
+---------+--------+
| userid | index|
+---------+--------+
| user1| 1|
| user2| 2|
| user3| 3|
| user4| 4|
| user5| 5|
| user6| 6|
| user7| 7|
| user8| 8|
| user9| 9|
| user10| 10|
+---------+--------+
我想向它附加一个新的数据框,并为新添加的列添加一个索引。
userid
是唯一的,现有数据框将没有 Dataframe 2 用户 ID。
+----------+
| userid |
+----------+
| user11|
| user21|
| user41|
| user51|
| user64|
+----------+
新添加的userid
和索引的预期输出
+---------+--------+
| userid | index|
+---------+--------+
| user1| 1|
| user2| 2|
| user3| 3|
| user4| 4|
| user5| 5|
| user6| 6|
| user7| 7|
| user8| 8|
| user9| 9|
| user10| 10|
| user11| 11|
| user21| 12|
| user41| 13|
| user51| 14|
| user64| 15|
+---------+--------+
是否可以通过从给定索引值传递第二个数据帧的最大索引值和起始索引来实现这一点。
【问题讨论】:
您有机会尝试答案吗?对你有用吗? 嗨 Raghu,是的,我有机会但决定使用不同的方法,因为第二个数据集较小,可能不需要在df_merge
上进行窗口分区
【参考方案1】:
如果用户 ID 有一些排序,那么您可以使用 rownumber 函数。即使没有,也可以使用 monotonically_increasing_id() 添加一个 id。现在我假设可以订购用户 ID。然后你可以这样做:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df_merge = df1.select('userid').union(df2.select('userid'))
w=Window.orderBy('userid')
df_result = df_merge.withColumn('indexid',F.row_number().over(w))
编辑:在评论讨论后。
#%% Test data and imports
import pyspark.sql.functions as F
from pyspark.sql import Window
df = sqlContext.createDataFrame([('a',100),('ab',50),('ba',300),('ced',60),('d',500)],schema=['userid','index'])
df1 = sqlContext.createDataFrame([('fgh',100),('ff',50),('fe',300),('er',60),('fi',500)],schema=['userid','dummy'])
#%%
#%% Merge the two dataframes, with a null columns as the index
df1=df1.withColumn('index', F.lit(None))
df_merge = df.select(df.columns).union(df1.select(df.columns))
#%%Define a window to arrange the newly added rows at the last and order them by userid
#%% The user id, even though random strings, can be ordered
w= Window.orderBy(F.col('index').asc_nulls_last(),F.col('userid'))# if possible add a partition column here, otherwise all your data will come in one partition, consider salting
#%% For the newly added rows, define index as the maximum value + increment of number of rows in main dataframe
df_final = df_merge.withColumn("index_new",F.when(~F.col('index').isNull(),F.col('index')).otherwise((F.last(F.col('index'),ignorenulls=True).over(w))+F.sum(F.lit(1)).over(w)))
#%% If number of rows in main dataframe is huge, then add an offset in the above line
df_final.show()
+------+-----+---------+
|userid|index|index_new|
+------+-----+---------+
| ab| 50| 50|
| ced| 60| 60|
| a| 100| 100|
| ba| 300| 300|
| d| 500| 500|
| er| null| 506|
| fe| null| 507|
| ff| null| 508|
| fgh| null| 509|
| fi| null| 510|
+------+-----+---------+
【讨论】:
用户 ID 未排序。此外,数据帧 1 中有数十亿个 id,其现有索引无法更改。 现有的 id 是否如输入所示连续编号? 新用户ID应该增加1,还是可以是随机的但只是唯一的?您是否只在寻找 scala 解决方案? 是的,我更喜欢它增加 1 而不是随机数。我使用 Scala,但 python 也可以使用以上是关于Spark向数据框添加索引并附加其他没有索引的数据集的主要内容,如果未能解决你的问题,请参考以下文章
向 pyspark 数据框添加行索引(并排添加新列/连接数据框)