PYSPARK:根据条件用另一个行值更新一行中的值?

Posted

技术标签:

【中文标题】PYSPARK:根据条件用另一个行值更新一行中的值?【英文标题】:PYSPARK : Update value in a row with another row value based on condition? 【发布时间】:2022-01-16 03:58:16 【问题描述】:

我需要在以下条件下根据唯一票证 id(groupby) 更新一些列并返回特定记录:

1.每当状态关闭时 - 特定记录 run_date only 需要在基于 已关闭状态记录closed_time 列 中获取更新在唯一的票证 ID 上。

2.每当状态为进行中时 - 特定记录 run_date only 需要在 已关闭状态记录inprogress_time 列 中获取更新> 基于唯一的工单 ID。(只有 run_date 会在已关闭状态记录的 inprogress_time 中得到更新)。

3.每当状态被取消时-该特定记录run_date只需要根据唯一的票证ID在已取消状态记录cancelled_time列中获取更新。

INPUT DATAFRAME

Id  type  inprogress_time  closed_time  cancelled_time  status    Source_system   Run_date
11  TRUCK   NAN            NAN           NAN            Created     LIBERATE    1/9/2021 12:00
11  TRUCK   NAN            NAN           NAN            In_Progress LIBERATE    1/9/2021 12:00
11  TRUCK   NAN            NAN           NAN            Closed      LIBERATE    8/9/2021 19:21
22  TRUCK   NAN            NAN           NAN            Cancelled   LIBERATE    3/9/2021 15:08
33  TRUCK   NAN            NAN           NAN            Created     LIBERATE    4/10/2021 15:08
33  TRUCK   NAN            NAN           NAN            In_Progress LIBERATE    4/10/2021 15:08
33  TRUCK   NAN            NAN           NAN            Closed      LIBERATE    5/10/2021 15:08

EXPECTED RESULT(OUTPUT DATAFRAME)

Id  type    inprogress_time  closed_time    cancelled_time     status       Source_system  run_date
11  TRUCK   1/9/2021 12:00  8/9/2021 19:21   NAN               Closed        LIBERATE  8/9/2021 19:21
22  TRUCK   NAN             NAN              3/9/2021 15:08    Cancelled     LIBERATE  3/9/2021 15:08                 
33  TRUCK   4/10/2021 15:08 5/10/2021 15:08  NAN               Closed        LIBERATE  5/10/2021 15:08

【问题讨论】:

【参考方案1】:

我认为pivot 效率更高。

df.groupBy('Id', 'type', 'Source_system').pivot('status').agg(f.first('Run_date')) \
  .withColumnRenamed('Cancelled',   'cancelled_time') \
  .withColumnRenamed('Closed',      'closed_time') \
  .withColumnRenamed('In_Progress', 'inprogress_time') \
  .drop('Created') \
  .withColumn('status', f.expr('''
      CASE WHEN cancelled_time  is not null THEN 'Cancelled'
           WHEN closed_time     is not null THEN 'Closed'
           WHEN inprogress_time is not null THEN 'In_Progress'
      ELSE 'Created' END ''')) \
  .show(truncate=False)

+---+-----+-------------+--------------+---------------+---------------+---------+
|Id |type |Source_system|cancelled_time|closed_time    |inprogress_time|status   |
+---+-----+-------------+--------------+---------------+---------------+---------+
|33 |TRUCK|LIBERATE     |null          |5/10/2021 15:08|4/10/2021 15:08|Closed   |
|11 |TRUCK|LIBERATE     |null          |8/9/2021 19:21 |1/9/2021 12:00 |Closed   |
|22 |TRUCK|LIBERATE     |3/9/2021 15:08|null           |null           |Cancelled|
+---+-----+-------------+--------------+---------------+---------------+---------+

【讨论】:

谢谢!!非常有帮助..它对我有用。但是如果我想将特定的 run_date 列也添加到输出数据框意味着我应该怎么做?我尝试在 groupby 中添加 run_date 列。但它返回了我不想要的另一列。因此,我需要在输出数据帧中再包含一列具有特定运行日期的列。你能帮我解决这个问题吗? 具体是什么意思?什么逻辑?我没明白你的确切意思。如果您需要更多,请修改您的问题或发布另一个问题。 请查看更新后的问题。 Atlast ,我还想添加 run_date 列。你能帮我解决这个问题吗...我尝试在 groupby 中添加 run_date 但这给出了另一行我不想要的@Lamanus 我尝试在 groupby 中添加 run_date 列,但没有成功。示例:df.groupBy('Id', 'type', 'Source_system','Run_date').pivot('status').agg(f.first('Run_date')) 我认为您不能简单地通过 groupby 添加它,而是稍后从 *_time 列中返回。例如,.withColumn('Run_date', f.coalesce('cancelled_time', 'closed_time'))

以上是关于PYSPARK:根据条件用另一个行值更新一行中的值?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:对于每一行,根据条件计算另一个表

根据同一行中的行值逐行更新列值

用另一个表中的值更新一个表

用另一个表中的值更新一个表

MySQL:用其他行值计算和更新一行?

在pyspark数据框中根据group by连接行值