PYSPARK:根据条件用另一个行值更新一行中的值?
Posted
技术标签:
【中文标题】PYSPARK:根据条件用另一个行值更新一行中的值?【英文标题】:PYSPARK : Update value in a row with another row value based on condition? 【发布时间】:2022-01-16 03:58:16 【问题描述】:我需要在以下条件下根据唯一票证 id(groupby) 更新一些列并返回特定记录:
1.每当状态关闭时 - 特定记录 run_date only 需要在基于 已关闭状态记录 的 closed_time 列 中获取更新在唯一的票证 ID 上。
2.每当状态为进行中时 - 特定记录 run_date only 需要在 已关闭状态记录的 inprogress_time 列 中获取更新> 基于唯一的工单 ID。(只有 run_date 会在已关闭状态记录的 inprogress_time 中得到更新)。
3.每当状态被取消时-该特定记录run_date只需要根据唯一的票证ID在已取消状态记录的cancelled_time列中获取更新。
INPUT DATAFRAME
Id type inprogress_time closed_time cancelled_time status Source_system Run_date
11 TRUCK NAN NAN NAN Created LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN In_Progress LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN NAN Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK NAN NAN NAN Created LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN In_Progress LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN Closed LIBERATE 5/10/2021 15:08
EXPECTED RESULT(OUTPUT DATAFRAME)
Id type inprogress_time closed_time cancelled_time status Source_system run_date
11 TRUCK 1/9/2021 12:00 8/9/2021 19:21 NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN 3/9/2021 15:08 Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK 4/10/2021 15:08 5/10/2021 15:08 NAN Closed LIBERATE 5/10/2021 15:08
【问题讨论】:
【参考方案1】:我认为pivot
效率更高。
df.groupBy('Id', 'type', 'Source_system').pivot('status').agg(f.first('Run_date')) \
.withColumnRenamed('Cancelled', 'cancelled_time') \
.withColumnRenamed('Closed', 'closed_time') \
.withColumnRenamed('In_Progress', 'inprogress_time') \
.drop('Created') \
.withColumn('status', f.expr('''
CASE WHEN cancelled_time is not null THEN 'Cancelled'
WHEN closed_time is not null THEN 'Closed'
WHEN inprogress_time is not null THEN 'In_Progress'
ELSE 'Created' END ''')) \
.show(truncate=False)
+---+-----+-------------+--------------+---------------+---------------+---------+
|Id |type |Source_system|cancelled_time|closed_time |inprogress_time|status |
+---+-----+-------------+--------------+---------------+---------------+---------+
|33 |TRUCK|LIBERATE |null |5/10/2021 15:08|4/10/2021 15:08|Closed |
|11 |TRUCK|LIBERATE |null |8/9/2021 19:21 |1/9/2021 12:00 |Closed |
|22 |TRUCK|LIBERATE |3/9/2021 15:08|null |null |Cancelled|
+---+-----+-------------+--------------+---------------+---------------+---------+
【讨论】:
谢谢!!非常有帮助..它对我有用。但是如果我想将特定的 run_date 列也添加到输出数据框意味着我应该怎么做?我尝试在 groupby 中添加 run_date 列。但它返回了我不想要的另一列。因此,我需要在输出数据帧中再包含一列具有特定运行日期的列。你能帮我解决这个问题吗? 具体是什么意思?什么逻辑?我没明白你的确切意思。如果您需要更多,请修改您的问题或发布另一个问题。 请查看更新后的问题。 Atlast ,我还想添加 run_date 列。你能帮我解决这个问题吗...我尝试在 groupby 中添加 run_date 但这给出了另一行我不想要的@Lamanus 我尝试在 groupby 中添加 run_date 列,但没有成功。示例:df.groupBy('Id', 'type', 'Source_system','Run_date').pivot('status').agg(f.first('Run_date')) 我认为您不能简单地通过 groupby 添加它,而是稍后从 *_time 列中返回。例如,.withColumn('Run_date', f.coalesce('cancelled_time', 'closed_time'))
。以上是关于PYSPARK:根据条件用另一个行值更新一行中的值?的主要内容,如果未能解决你的问题,请参考以下文章