如何在特殊条件下添加具有不同行的两列?

Posted

技术标签:

【中文标题】如何在特殊条件下添加具有不同行的两列?【英文标题】:How do I add two columns with different rows with special condition? 【发布时间】:2019-02-19 09:17:23 【问题描述】:

您好,我有一个 PySpark 数据框。所以,我想在特殊条件下添加来自不同行的两列。其中一列是日期类型。

以下是数据示例:

--------------------------------
| flag|      date     |  diff  |
--------------------------------
| 1   |   2014-05-31  | 0      |
--------------------------------
| 2   |   2014-06-02  | 2      |
--------------------------------
| 3   |   2016-01-14  | 591    |
--------------------------------
| 1   |   2016-07-08  | 0      |
--------------------------------
| 2   |   2016-07-12  | 4      |
--------------------------------

目前我只知道如何使用以下代码添加两列:

from pyspark.sql.functions import expr
dataframe.withColumn("new_column", expr("date_add(date_column, int_column)"))

预期结果

有一个名为“new_date”的新列,它是通过将“diff”列添加到“date column”的结果。

问题是有一个特殊条件:如果“flag”为 1,“date”和“diff”来自同一行,如果不是,“date”来自上一行.

我知道在这种情况下,我的数据必须正确排序。

如果有人可以帮助我,我将不胜感激。谢谢。

【问题讨论】:

flag为1时diff是否总是0? @gaw 很遗憾没有 第一行应该发生什么?没有之前的日期,如果flag 设置为1,是否应该将diff 添加到之前的日期?或者应该只是上一个日期在new_date 列中 @gaw 第一行的 "flag" 值始终为 1。如果 "flag" 值为 1,则添加的 "date" 和 "diff" 来自同一行。为了清楚起见,“flag”意味着像第 n 次使用。 【参考方案1】:

您只需使用 Window 创建一个包含上一个日期的列,并根据 'flag' 的值构造新列

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(F.col('date'))

dataframe = dataframe.withColumn('previous_date', F.lag('date', 1).over(w))

dataframe = dataframe.withColumn('new_date',
                                 F.when(F.col('flag')==1,
                                        F.expr("date_add(previous_date, diff)")
                                        ).otherwise(F.expr("date_add(date, diff)"))
                                ).drop('previous_date')

【讨论】:

您可以将dataframe = dataframe.withColumn('previous_date', F.lag('date', 1).over(w)) 和测试替换为dataframe = dataframe.withColumn('previous_date', F.coalesce(F.lag('date').over(w), F.col("date"))。之后无需使用when 您不能,因为您的列中总会有上一个日期(不包括第一行)。根据我的理解,如果不以“标志”的价值为条件,你就无法逃脱。 是的,@XavierCanton 是对的。我会试试你的答案。希望它能解决我的问题。 我认为您在这里遇到了一些语法问题。如果我没记错的话,括号不完全匹配。在我的环境中,date_add 函数不能直接作用于列。但可能是环境问题 你是对的,有一个缺少的括号,我将编辑我的答案。 date_add 相同,我将使用 expr 进行编辑【参考方案2】:

以防万一您对 Xavier 的回答有同样的问题。思路是一样的,但是我去掉了Window的一些不必要的条件,修复了语法错误,以及我在尝试他的版本时遇到的date_add错误。

from pyspark.sql.functions import *
df1 = spark.createDataFrame([(1,datetime.date(2014,5,31),0),(2,datetime.date(2014,6,2),2),(3,datetime.date(2016,1,14),591),(1,datetime.date(2016,7,8),0),(2,datetime.date(2016,7,12),4)], ["flag","date","diff"])

w = Window.orderBy(col("date"))
df1 = df1.withColumn('previous_date', lag('date', 1).over(w))
df1 = df1.withColumn('new_date',when(col('flag')==1,\
expr('date_add(date, diff)'))\
.otherwise(expr('date_add(previous_date,diff)'))).drop('previous_date')
df1.show()

输出:

+----+----------+----+----------+
|flag|      date|diff|  new_date|
+----+----------+----+----------+
|   1|2014-05-31|   0|2014-05-31|
|   2|2014-06-02|   2|2014-06-02|
|   3|2016-01-14| 591|2016-01-14|
|   1|2016-07-08|   0|2016-07-08|
|   2|2016-07-12|   4|2016-07-12|
+----+----------+----+----------+

【讨论】:

随时为有帮助的答案投票并标记正确答案,以显示此问题已回答 是的,我不能这样做,我的声望仍然低于 15。一旦我符合条件就会这样做。

以上是关于如何在特殊条件下添加具有不同行的两列?的主要内容,如果未能解决你的问题,请参考以下文章

具有 NA 的条件最少的两列

如何将具有默认值的两列添加到配置单元中的现有表?

Hive:如何比较 WHERE 子句中具有复杂数据类型的两列?

如何基于每个数据框中具有不同名称的两列将两个数据框与 dplyr 连接起来? [复制]

连接具有相同标题但在不同表中的两列

如何减去 Power Query 数据透视表中的两列?