PySpark 中的 Groupby cumcount
Posted
技术标签:
【中文标题】PySpark 中的 Groupby cumcount【英文标题】:Groupby cumcount in PySpark 【发布时间】:2019-04-10 16:10:17 【问题描述】:我有一个如下的数据框:
---------------
id | name |
---------------
1 | joe |
1 | john |
2 | jane |
3 | jo |
---------------
目标是,如果 'id' 列重复,则从 1 开始向其添加升序。
在 Pandas 中,我可以这样做:
count_id = df.groupby(['id']).cumcount()
count_num = count_id.replace(0, '').astype(str)
df['id'] += count_num
我尝试在 PySpark 中使用相同的逻辑,但没有成功。
结果应该是:
id | name |
---------------
1 | joe |
11 | john |
2 | jane |
3 | jo |
---------------
如何在 PySpark 中实现同样的效果?非常感谢任何帮助。
【问题讨论】:
【参考方案1】:要复制该输出,您可以使用Window
为每个id
获取row_number
,然后使用concat
将其添加到id
。
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy("id").orderBy("name")
df.withColumn("row_number", f.row_number().over(w)-1)\
.withColumn(
"id",
f.when(
f.col("row_number") > 0,
f.concat(f.col("id"), f.col("row_number"))
).otherwise(f.col("id"))
)\
.drop("row_number")\
.show()
#+---+----+
#| id|name|
#+---+----+
#| 1| joe|
#| 11|john|
#| 3| jo|
#| 2|jane|
#+---+----+
注意:这会将id
列转换为StringType
列(如果它还没有的话)。
为了获得您最初在问题中陈述的输出作为所需的结果,除了计算行号之外,您还必须add a group count column。仅当计数大于 1 时才连接行号。
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy("id")
df.withColumn("count", f.count("*").over(w))\
.withColumn("row_number", f.row_number().over(w.orderBy("name")))\
.withColumn(
"id",
f.when(
f.col("count") > 1,
f.concat(f.col("id"), f.col("row_number"))
).otherwise(f.col("id"))
)\
.drop("count", "row_number")\
.show()
#+---+----+
#| id|name|
#+---+----+
#| 11| joe|
#| 12|john|
#| 3| jo|
#| 2|jane|
#+---+----+
【讨论】:
以上是关于PySpark 中的 Groupby cumcount的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 1.5 Groupby Sum 用于 Dataframe 中的新列
PySpark Dataframe中的GroupBy水平堆叠[重复]
df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项