PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1
Posted
技术标签:
【中文标题】PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1【英文标题】:PySpark - Add an incrementing number column which resets to 1 based on another column value changing 【发布时间】:2020-01-23 14:29:44 【问题描述】:首先我应该说我对 Python 和 PySpark 都很陌生,我的大部分经验是在 MS SQL、C#、VB.NET 等方面......
我有一个要添加“group_number”字段的数据框。我需要这个数字根据日期时间字段递增,并根据值字段重置。所以我希望输出如下:
+-----+----------------+-------------+
|value|datetime |group_number |
+-----+----------------+-------------+
|00001|2020-01-01 00:00|1 |
|00001|2020-01-01 02:10|2 |
|00001|2020-01-01 05:14|3 |
|00002|2020-01-01 00:03|1 |
|00002|2020-01-01 02:04|2 |
|00003|2020-01-01 03:03|1 |
+-----+----------------+-------------+
日期时间值有点无关紧要,因为它们可以在不同的点开始和结束,并在每组内增加不同的数量,我只需要一个数字(1 到 x),它按时间顺序对每个“值”字段进行排序。
我已经编写了一个 udf 来尝试这样做,但我认为它没有正确地对它们进行排序,我最终得到的结果大多是 '1' 值和偶尔的 '2'。
udf定义为:
def createGroupID(value):
global iterationCount
global currentValue
if value == currentValue:
iterationCount = iterationCount + 1
return iterationCount
iterationCount = 1
currentValue = value
return iterationCount
两个全局变量在主应用程序中初始化,udf被调用为:
createCountNumber = udf(createGroupID, StringType())
newdf = df.withColumn("group_number", createCountNumber('value'))
如果有人能帮我解决这个问题,我将不胜感激!非常感谢。
【问题讨论】:
【参考方案1】:感谢 Anil 为我指明了正确的方向......我在 Groupby cumcount in PySpark 找到了获得完整解决方案的方法
我需要添加以下内容:
w = Window.partitionBy("value")
df = df.withColumn("count", count("*").over(w))\
.withColumn("group_number", row_number().over(w.orderBy("datetime")))
现在我得到了我需要的东西!
哦,我还需要添加一行,让我使用上面代码块中的所有功能:
from pyspark.sql.functions import col, size, lit, udf, concat, row_number, count, when
【讨论】:
【参考方案2】:将数据作为 pandas 数据框读取。然后您可以按值分组并进行累积计数:
import pandas as pd
df = pd.read_excel(r'file_path')
df['seq'] = df.groupby(['value', 'datetime']).cumcount()+1
df
【讨论】:
嗨 Anil - 感谢您对我的问题做出如此迅速的回应。我计划在某个时候将其迁移到 AWS EMR,但我不确定 pandas 是否可以用于此。此外,我的数据集有时可能非常大。这些中的任何一个都可能给我带来问题吗? (与此同时,我正在尝试测试您的建议,谢谢) 对于您的要求,下面的链接可能很有用。请检查:***.com/questions/41890485/… 嗨,阿尼尔 - 感谢您的链接。这很有帮助,但我缺少的一个步骤是按日期时间的顺序增加计数,而不是只获得它们的计数......作为旁注,我设法将 Pandas 选项编码,但我立即点击了Java 堆空间错误....对于这么大的数据集,我想这并不奇怪。如果没有其他可以减少我的选择:) 我刚刚找到了这个页面:***.com/questions/55617034/…,我认为可以帮助我。我将通过这个来查看它是否有效,并且一旦我到达某个地方就会回复。再次感谢阿尼尔——你的建议似乎为我指明了正确的方向。真的很感激。以上是关于PySpark - 添加一个递增的数字列,该列根据另一列值的变化重置为 1的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PySpark 中的多个列创建字典列表,其中键是列名,值是该列的值?
Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值