PySpark 中的 Groupby cumcount

Posted

技术标签:

【中文标题】PySpark 中的 Groupby cumcount【英文标题】:Groupby cumcount in PySpark 【发布时间】:2019-04-10 16:10:17 【问题描述】:

我有一个如下的数据框:

---------------
id   | name   |
---------------
 1   | joe    |
 1   | john   |
 2   | jane   |
 3   | jo     |
---------------

目标是,如果 'id' 列重复,则从 1 开始向其添加升序。

在 Pandas 中,我可以这样做:

count_id = df.groupby(['id']).cumcount()
count_num = count_id.replace(0, '').astype(str)
df['id'] += count_num

我尝试在 PySpark 中使用相同的逻辑,但没有成功。

结果应该是:


id   | name   |
---------------
 1   | joe    |
 11  | john   |
 2   | jane   |
 3   | jo     |
---------------

如何在 PySpark 中实现同样的效果?非常感谢任何帮助。

【问题讨论】:

【参考方案1】:

要复制该输出,您可以使用Window 为每个id 获取row_number,然后使用concat 将其添加到id

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("id").orderBy("name")
df.withColumn("row_number", f.row_number().over(w)-1)\
    .withColumn(
        "id", 
        f.when(
            f.col("row_number") > 0, 
            f.concat(f.col("id"), f.col("row_number"))
        ).otherwise(f.col("id"))
    )\
    .drop("row_number")\
    .show()
#+---+----+
#| id|name|
#+---+----+
#|  1| joe|
#| 11|john|
#|  3|  jo|
#|  2|jane|
#+---+----+

注意:这会将id 列转换为StringType 列(如果它还没有的话)。


为了获得您最初在问题中陈述的输出作为所需的结果,除了计算行号之外,您还必须add a group count column。仅当计数大于 1 时才连接行号。

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("id")
df.withColumn("count", f.count("*").over(w))\
    .withColumn("row_number", f.row_number().over(w.orderBy("name")))\
    .withColumn(
        "id", 
        f.when(
            f.col("count") > 1, 
            f.concat(f.col("id"), f.col("row_number"))
        ).otherwise(f.col("id"))
    )\
    .drop("count", "row_number")\
    .show()
#+---+----+
#| id|name|
#+---+----+
#| 11| joe|
#| 12|john|
#|  3|  jo|
#|  2|jane|
#+---+----+

【讨论】:

以上是关于PySpark 中的 Groupby cumcount的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 1.5 Groupby Sum 用于 Dataframe 中的新列

PySpark Dataframe中的GroupBy水平堆叠[重复]

Pyspark 中的 Groupby 和标准化值

df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项

PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来