在 Spark 数据框的列中为每个组添加递增的数字
Posted
技术标签:
【中文标题】在 Spark 数据框的列中为每个组添加递增的数字【英文标题】:Add increasing number for each group in column of Spark dataframe 【发布时间】:2020-07-07 09:42:45 【问题描述】:我有一个包含 2 列“Id”和“category”的数据框。对于每个类别,我想标记编码列“Id”,因此预期的结果将是这样的列“Enc_id”
Id Category Enc_id
a1 A 0
a2 A 1
b1 B 0
c1 C 0
c2 C 1
a3 A 2
b2 B 1
b3 B 2
b4 B 3
b4 B 3
b3 B 2
这里的Id可能不是唯一的,所以可能会有重复的行。我想为partitionBy(category)
创建一个窗口,然后在此窗口上应用标签编码(StringIndexer
),但它不起作用。请问有什么提示吗?
【问题讨论】:
是,列 id,唯一吗? @Raghu:可能是也可能不是。我已经相应地编辑了问题。 【参考方案1】:您可以将window
函数与substring
函数一起使用并计算rank
val window = Window.partitionBy($"Category", substring($"Id", 1,1)).orderBy("Id")
df.withColumn("Enc_id", rank().over(window) - 1) // -1 to start the rank from 0
.show(false)
输出:
+---+--------+------+
|Id |Category|Enc_id|
+---+--------+------+
|a1 |A |0 |
|a2 |A |1 |
|a3 |A |2 |
|c1 |C |0 |
|c2 |C |1 |
|b1 |B |0 |
|b2 |B |1 |
|b3 |B |2 |
|b4 |B |3 |
+---+--------+------+
更新1: 对于具有重复 id 的更新案例
df1.groupBy("Id", "Category")
.agg(collect_list("Category") as "list_category")
.withColumn("Enc_id", rank().over(window) - 1)
.withColumn("Category", explode($"list_category"))
.drop("list_category")
.show(false)
输出:
+---+--------+------+
|Id |Category|Enc_id|
+---+--------+------+
|a1 |A |0 |
|a2 |A |1 |
|a3 |A |2 |
|c1 |C |0 |
|c2 |C |1 |
|b1 |B |0 |
|b2 |B |1 |
|b3 |B |2 |
|b3 |B |2 |
|b4 |B |3 |
|b4 |B |3 |
+---+--------+------+
【讨论】:
这很有趣。如果 Id 不是唯一的,它是否也有效?我的意思是可能有重复的行。 行重复时会发生什么?这取决于你想要什么 我刚刚通过在数据框末尾添加 2 个新行来编辑问题中的示例。输出应该与应用 StringIndexer 相同。 对于更新的案例,您需要先分组并应用窗口函数,然后再次分解聚合。检查更新 谢谢!另一个问题,如果我想做类似的事情:collect_list("Category").sliding(2,1),正确的语法应该是什么?因为我看到 collect_list("Category") 是一个 ArrayType(String) 对象,而不是 List。以上是关于在 Spark 数据框的列中为每个组添加递增的数字的主要内容,如果未能解决你的问题,请参考以下文章
如何将 numpy 数组存储在 Pandas 数据框的列中?