如何在 pyspark 中创建新列,其中条件取决于列的后续值?
Posted
技术标签:
【中文标题】如何在 pyspark 中创建新列,其中条件取决于列的后续值?【英文标题】:How to create new column in pyspark where the conditional depends on the subsequent values of a column? 【发布时间】:2020-10-15 13:18:06 【问题描述】:我有以下 pyspark 数据框。如下图所示,我需要从 Value1 列创建 Value2 列。问题是对于所有具有相同时间值的行,Value2列中的值需要是Value1中与该时间对应的第一个值。因此,如果您查看时间为 16:07 的所有行的图片,则该值需要为 100。我知道我需要使用某种形式的条件,但我不知道该怎么做,所以我是想知道是否有人能指出我正确的方向。
data=
‘Name’:[‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’, ‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’],
’Value1’:[10,20,50,60,100,200,250,300,450,520,570,600,630,670,690,700,720],
’Time’:[‘ 15/06/2020 16:05:00’, ‘ 15/06/2020 16:05:00’, ‘ 15/06/2020 16:05:00’, ‘ 15/06/2020 16:06:00’, ‘ 15/06/2020 16:07:00’, ‘ 15/06/2020 16:07:00’, ‘ 15/06/2020 16:08:00’, ‘ 15/06/2020 16:09:00’, ‘ 15/06/2020 16:10:00’, ‘ 15/06/2020 17:20:00’, ‘ 15/06/2020 17:21:00’, ‘ 15/06/2020 17:22:00’, ‘ 15/06/2020 17:22:00’, ‘ 15/06/2020 17:22:00’, ‘ 15/06/2020 17:22:00’, , ‘ 15/06/2020 17:25:00’, , ‘ 15/06/2020 17:26:00’
df=pd.DataFrame(data)
df_spark=spark.createDataFrame(df)
【问题讨论】:
【参考方案1】:你有两个选择:
-
使用窗口函数提取
Value 1
中的第一个值/最小值。注意:如果这花费的时间太长,请使用选项 2。窗口函数通常性能不佳,应避免在大型数据帧上使用。
-
第二个选项是创建一个聚合表并将其连接回您的数据框。如果您的数据集很大但需要 2 个步骤,则这是一种性能更高的方法。
聚合表的查询可能如下所示:
spark.sql("SELECT Name, Time, Min(Value1) FROM Table GROUP BY Name, Time").createOrReplaceTempView("Aggregate_Table")
。最终表的查询将是:
spark.sql("SELECT a.*, b.Time AS Value2 FROM Table a INNER JOIN Aggregate_Table b ON a.Name = b.Name AND a.Time = b.Time")
【讨论】:
嗨@Arjoon,你能分享一些关于`窗口函数的外部资源/参考,一般来说,在大型数据帧上应该避免使用。` 找不到快速直接的参考资料,但我会在这里尝试解释。如果数据帧上没有任何分区,窗口函数会导致数据混洗。在大型数据帧中,这可能是一项内存密集型任务,可能需要很长时间,具体取决于整体 Spark 查询计划和可用资源。如果数据框由窗口函数中 PARTITION 子句调用的同一列进行分区,则此任务将按此处所述执行:***.com/questions/54332942/… 感谢您的解释:) 我想确认如果分区正确,Windows 不会那么糟糕,您分享的答案也证实了这一点。【参考方案2】:试试窗口函数。您在“时间”列定义的窗口中获得最小值 1:
from pyspark.sql import Window
window = Window.partitionBy('Time')
df_spark.withColumn('Value2', min('Value1').over(window))
【讨论】:
以上是关于如何在 pyspark 中创建新列,其中条件取决于列的后续值?的主要内容,如果未能解决你的问题,请参考以下文章
如何在返回结果中创建新字段并根据使用 Mongoose 的条件在其中设置值?
如何删除元素如何根据另一个rdd从一个rdd中删除元素并在pyspark中创建新的rdd?
如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?