PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1

Posted

技术标签:

【中文标题】PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1【英文标题】:PySpark - Add an incrementing number column which resets to 1 based on another column value changing 【发布时间】:2020-01-23 14:29:44 【问题描述】:

首先我应该说我对 Python 和 PySpark 都很陌生,我的大部分经验是在 MS SQL、C#、VB.NET 等方面......

我有一个要添加“group_number”字段的数据框。我需要这个数字根据日期时间字段递增,并根据值字段重置。所以我希望输出如下:

+-----+----------------+-------------+
|value|datetime        |group_number |
+-----+----------------+-------------+
|00001|2020-01-01 00:00|1            |
|00001|2020-01-01 02:10|2            |
|00001|2020-01-01 05:14|3            |
|00002|2020-01-01 00:03|1            |
|00002|2020-01-01 02:04|2            |
|00003|2020-01-01 03:03|1            |
+-----+----------------+-------------+

日期时间值有点无关紧要,因为它们可以在不同的点开始和结束,并在每组内增加不同的数量,我只需要一个数字(1 到 x),它按时间顺序对每个“值”字段进行排序。

我已经编写了一个 udf 来尝试这样做,但我认为它没有正确地对它们进行排序,我最终得到的结果大多是 '1' 值和偶尔的 '2'。

udf定义为:

def createGroupID(value):
    global iterationCount
    global currentValue

    if value == currentValue:
        iterationCount = iterationCount + 1
        return iterationCount

    iterationCount = 1
    currentValue = value
    return iterationCount

两个全局变量在主应用程序中初始化,udf被调用为:

    createCountNumber = udf(createGroupID, StringType())
    newdf = df.withColumn("group_number", createCountNumber('value'))

如果有人能帮我解决这个问题,我将不胜感激!非常感谢。

【问题讨论】:

【参考方案1】:

感谢 Anil 为我指明了正确的方向......我在 Groupby cumcount in PySpark 找到了获得完整解决方案的方法

我需要添加以下内容:

    w = Window.partitionBy("value")
    df = df.withColumn("count", count("*").over(w))\
        .withColumn("group_number", row_number().over(w.orderBy("datetime")))

现在我得到了我需要的东西!

哦,我还需要添加一行,让我使用上面代码块中的所有功能:

from pyspark.sql.functions import col, size, lit, udf, concat, row_number, count, when

【讨论】:

【参考方案2】:

将数据作为 pandas 数据框读取。然后您可以按值分组并进行累积计数:

import pandas as pd
df = pd.read_excel(r'file_path')
df['seq'] = df.groupby(['value', 'datetime']).cumcount()+1
df

【讨论】:

嗨 Anil - 感谢您对我的问题做出如此迅速的回应。我计划在某个时候将其迁移到 AWS EMR,但我不确定 pandas 是否可以用于此。此外,我的数据集有时可能非常大。这些中的任何一个都可能给我带来问题吗? (与此同时,我正在尝试测试您的建议,谢谢) 对于您的要求,下面的链接可能很有用。请检查:***.com/questions/41890485/… 嗨,阿尼尔 - 感谢您的链接。这很有帮助,但我缺少的一个步骤是按日期时间的顺序增加计数,而不是只获得它们的计数......作为旁注,我设法将 Pandas 选项编码,但我立即点击了Java 堆空间错误....对于这么大的数据集,我想这并不奇怪。如果没有其他可以减少我的选择:) 我刚刚找到了这个页面:***.com/questions/55617034/…,我认为可以帮助我。我将通过这个来查看它是否有效,并且一旦我到达某个地方就会回复。再次感谢阿尼尔——你的建议似乎为我指明了正确的方向。真的很感激。

以上是关于PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 根据另一列值的降序添加递增的整数排名值

如何从 PySpark 中的多个列创建字典列表,其中键是列名,值是该列的值?

Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

从 PySpark 中的复杂列中提取值

如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?

在 pyspark 中处理大数字的数据类型