如何在 pyspark 中创建新列,其中条件取决于列的后续值?

Posted

技术标签:

【中文标题】如何在 pyspark 中创建新列,其中条件取决于列的后续值?【英文标题】:How to create new column in pyspark where the conditional depends on the subsequent values of a column? 【发布时间】:2020-10-15 13:18:06 【问题描述】:

我有以下 pyspark 数据框。如下图所示,我需要从 Value1 列创建 Value2 列。问题是对于所有具有相同时间值的行,Value2列中的值需要是Value1中与该时间对应的第一个值。因此,如果您查看时间为 16:07 的所有行的图片,则该值需要为 100。我知道我需要使用某种形式的条件,但我不知道该怎么做,所以我是想知道是否有人能指出我正确的方向。

data=
    ‘Name’:[‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’, ‘John’,’Kate’,’William’,’Andrew’,’Nicole’,’Nicola’,’Gavin’,’Gabriel’],
    ’Value1’:[10,20,50,60,100,200,250,300,450,520,570,600,630,670,690,700,720],
    ’Time’:[‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:05:00’, ‘ 15/06/2020  16:06:00’, ‘ 15/06/2020  16:07:00’, ‘ 15/06/2020  16:07:00’, ‘ 15/06/2020  16:08:00’, ‘ 15/06/2020  16:09:00’, ‘ 15/06/2020  16:10:00’, ‘ 15/06/2020  17:20:00’, ‘ 15/06/2020  17:21:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, ‘ 15/06/2020  17:22:00’, , ‘ 15/06/2020  17:25:00’, , ‘ 15/06/2020  17:26:00’

df=pd.DataFrame(data)
df_spark=spark.createDataFrame(df)

【问题讨论】:

【参考方案1】:

你有两个选择:

    使用窗口函数提取Value 1 中的第一个值/最小值。注意:如果这花费的时间太长,请使用选项 2。窗口函数通常性能不佳,应避免在大型数据帧上使用。
PySpark Documentation on Window Functions Example of window functions in SQL
    第二个选项是创建一个聚合表并将其连接回您的数据框。如果您的数据集很大但需要 2 个步骤,则这是一种性能更高的方法。 聚合表的查询可能如下所示: spark.sql("SELECT Name, Time, Min(Value1) FROM Table GROUP BY Name, Time").createOrReplaceTempView("Aggregate_Table")。最终表的查询将是: spark.sql("SELECT a.*, b.Time AS Value2 FROM Table a INNER JOIN Aggregate_Table b ON a.Name = b.Name AND a.Time = b.Time")

【讨论】:

嗨@Arjoon,你能分享一些关于`窗口函数的外部资源/参考,一般来说,在大型数据帧上应该避免使用。` 找不到快速直接的参考资料,但我会在这里尝试解释。如果数据帧上没有任何分区,窗口函数会导致数据混洗。在大型数据帧中,这可能是一项内存密集型任务,可能需要很长时间,具体取决于整体 Spark 查询计划和可用资源。如果数据框由窗口函数中 PARTITION 子句调用的同一列进行分区,则此任务将按此处所述执行:***.com/questions/54332942/… 感谢您的解释:) 我想确认如果分区正确,Windows 不会那么糟糕,您分享的答案也证实了这一点。【参考方案2】:

试试窗口函数。您在“时间”列定义的窗口中获得最小值 1:

    from pyspark.sql import Window
    window = Window.partitionBy('Time')
    df_spark.withColumn('Value2', min('Value1').over(window))

【讨论】:

以上是关于如何在 pyspark 中创建新列,其中条件取决于列的后续值?的主要内容,如果未能解决你的问题,请参考以下文章

如何在返回结果中创建新字段并根据使用 Mongoose 的条件在其中设置值?

如何删除元素如何根据另一个rdd从一个rdd中删除元素并在pyspark中创建新的rdd?

如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?

如何在 python+numpy/pandas 中使用二值化在 json 文件中创建新列

如何在选择查询中创建新列

如何避免在数据库中创建新列