如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框

Posted

技术标签:

【中文标题】如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框【英文标题】:How to update a row of old dataframe with value of new dataframe with same customer id in pyspark 【发布时间】:2020-05-14 13:11:30 【问题描述】:

我必须使用 df_2 中匹配的 consumer_id 的新值更新旧数据帧(old_df),并将输出存储在 output_df/df_1 中。

只有 trigger_1、trigger_2、trigger_4 列中的值需要更新。

我在代码中使用了 '' 而不是 null

old_df:

+----------+-------------------+------------+----------+----------+---------+
|country_cd|        consumer_id|   person_id|trigger_1 | trigger_2|trigger_4|
+----------+-------------------+------------+----------+----------+---------+
|        CZ|             799618|       23899|25-02-2020|      null|     null|
|        CZ|             799625|       23958|07-09-2017|      null|     null|
|        CZ|             799627|       23972|15-02-2018|      null|     null|
|        CZ|             799631|       23984|16-07-2019|      null|     null|
|        CZ|             892599|       23993|30-11-2017|      null|     null|
|        CZ|             799633|       23997|      null|      null|     null|
|        CZ|             799634|       24000|24-03-2020|      null|     null|
|        CZ|             799635|       24007|27-09-2018|      null|     null|
|        CZ|             275701|       24023|27-02-2019|      null|     null|
|        CZ|             799637|       24028|     null |22-10-2019|     null|
|        CZ|             799638|       24029|22-10-2019|     null |     null|
|        CZ|             269398|       24033|15-10-2019|     null |     null| 

df_new:

+----------+-------------------+------------+----------+-----------+---------+
|country_cd|        consumer_id|   person_id|trigger_1 | trigger_2 |trigger_4|
+----------+-------------------+------------+----------+-----------+---------+
|        CZ|             799618|       23899|          | 15-02-2020|         |
|        CZ|             799625|       23958|          | 03-07-2020|         |
|        CZ|             799627|       23972|15-02-2020|           |         |
+----------+-------------------+------------+---------+------------+---------+

输出_df/df_1:

+----------+-------------------+------------+----------+----------+---------+
|country_cd|        consumer_id|   person_id|trigger_1 | trigger_2|trigger_4|
+----------+-------------------+------------+----------+----------+---------+
|        CZ|             799618|       23899|25-02-2020|15-02-2020|     null|
|        CZ|             799625|       23958|07-09-2017|03-07-2020|     null|
|        CZ|             799627|       23972|15-02-2020|      null|     null|
|        CZ|             799631|       23984|16-07-2019|      null|     null|
|        CZ|             892599|       23993|30-11-2017|      null|     null|
|        CZ|             799633|       23997|      null|      null|     null|
|        CZ|             799634|       24000|24-03-2020|      null|     null|
|        CZ|             799635|       24007|27-09-2018|      null|     null|
|        CZ|             275701|       24023|27-02-2019|      null|     null|
|        CZ|             799637|       24028|     null |22-10-2019|     null|
|        CZ|             799638|       24029|22-10-2019|     null |     null|
|        CZ|             269398|       24033|15-10-2019|     null |     null| 

我尝试了什么:

old_df = spark.createDataFrame(
[
('CZ',799618,23899,'25-02-2020',      '','')
('CZ',799625,23958,'07-09-2017',      '','')
('CZ',799627,23972,'15-02-2018',      '','')
('CZ',799631,23984,'16-07-2019',      '','')
('CZ',892599,23993,'30-11-2017',      '','')
('CZ',799633,23997,          '',      '','')
('CZ',799634,24000,'24-03-2020',      '','')
('CZ',799635,24007,'27-09-2018',      '','')
('CZ',275701,24023,'27-02-2019',      '','')
('CZ',799637,24028,          '','22-10-2019','')
('CZ',799638,24029,'22-10-2019',     '' ,'')
('CZ',269398,24033,'15-10-2019',     '' ,'')

]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"])
df.show()

df_new = spark.createDataFrame(
[
('CZ',203001799618,23899,'','3/5/2020',''),
('CZ',203001799625,23958,'','3/7/2020',''),
('CZ',203001799627,23972,'3/15/2020','','')
]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"])
df_new.show()

from pyspark.sql.functions import *

a = df_new.select("consumer_id").distinct().collect()
l = []
for x in a:
  l.append(x[0])
print(l)
c = ['CZ']
# include only records with consumer_ids in list l
df_1 = old_df.where(old_df.consumer_id.isin(l) & old_df.country_cd.isin(c))

df_1.withColumn('trigger_1',when(length(df_new.trigger_1)>1,df_new.trigger_1).otherwise(df_1.trigger_1))
df_1 =df_1.withColumn('trigger_2',when(length(df_new.trigger_2)>1,df_new.trigger_2).otherwise(df_1.trigger_2))
df_1 =df_1.withColumn('trigger_4',when(length(df_new.trigger_4)>1,df_new.trigger_4).otherwise(df_1.trigger_4)).show()

【问题讨论】:

【参考方案1】:

在连接子句中使用left join.containscoalesce 以获得所需的输出。

Example:

old_df = spark.createDataFrame(
[
('CZ',799618,23899,'25-02-2020',      '',''),
('CZ',799625,23958,'07-09-2017',      '',''),
('CZ',799627,23972,'15-02-2018',      '',''),
('CZ',799631,23984,'16-07-2019',      '',''),
('CZ',892599,23993,'30-11-2017',      '',''),
('CZ',799633,23997,          '',      '',''),
('CZ',799634,24000,'24-03-2020',      '',''),
('CZ',799635,24007,'27-09-2018',      '',''),
('CZ',275701,24023,'27-02-2019',      '',''),
('CZ',799637,24028,          '','22-10-2019',''),
('CZ',799638,24029,'22-10-2019',     '' ,''),
('CZ',269398,24033,'15-10-2019',     '' ,''),
]
,["country_cd","consumer_id","ak_person_id","trigger_1","trigger_2","trigger_4"]).withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None))
old_df.show()

df_new = spark.createDataFrame(
[
('CZ',203001799618,23899,'','3/5/2020',''),
('CZ',203001799625,23958,'','3/7/2020',''),
('CZ',203001799627,23972,'3/15/2020','','')
]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"]).withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None))
df_new.show()

    old_df.alias("t1").join(df_new.alias("t2"),col("t2.consumer_id").contains(col("t1.consumer_id")),"left").\
select(col("t1.country_cd"),col("t1.consumer_id"),col("t1.ak_person_id"),coalesce("t2.trigger_1","t1.trigger_1").alias("trigger_1"),coalesce("t2.trigger_2","t1.trigger_2").alias("trigger_2"),coalesce("t2.trigger_4","t1.trigger_4").alias("trigger_4")).\
withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).\
withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).\
withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None)).\
show()

#+----------+-----------+------------+----------+----------+---------+
#|country_cd|consumer_id|ak_person_id| trigger_1| trigger_2|trigger_4|
#+----------+-----------+------------+----------+----------+---------+
#|        CZ|     799618|       23899|25-02-2020|  3/5/2020|     null|
#|        CZ|     799625|       23958|07-09-2017|  3/7/2020|     null|
#|        CZ|     799627|       23972| 3/15/2020|      null|     null|
#|        CZ|     799631|       23984|16-07-2019|      null|     null|
#|        CZ|     892599|       23993|30-11-2017|      null|     null|
#|        CZ|     799633|       23997|      null|      null|     null|
#|        CZ|     799634|       24000|24-03-2020|      null|     null|
#|        CZ|     799635|       24007|27-09-2018|      null|     null|
#|        CZ|     275701|       24023|27-02-2019|      null|     null|
#|        CZ|     799637|       24028|      null|22-10-2019|     null|
#|        CZ|     799638|       24029|22-10-2019|      null|     null|
#|        CZ|     269398|       24033|15-10-2019|      null|     null|
#+----------+-----------+------------+----------+----------+---------+

【讨论】:

以上是关于如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 数据框列上拟合内核密度估计并将其用于创建具有估计的新列

PySpark 根据特定列重新分区

在pyspark中加入具有相同列名的数据框

PySpark 在创建包含现有列名的新列时读取多个文件

PySpark:基于数据框中具有 UUID 的列添加新列

创建具有相同类别的新数据时如何按类别计算数据