如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框
Posted
技术标签:
【中文标题】如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框【英文标题】:How to update a row of old dataframe with value of new dataframe with same customer id in pyspark 【发布时间】:2020-05-14 13:11:30 【问题描述】:我必须使用 df_2 中匹配的 consumer_id 的新值更新旧数据帧(old_df),并将输出存储在 output_df/df_1 中。
只有 trigger_1、trigger_2、trigger_4 列中的值需要更新。
我在代码中使用了 '' 而不是 null
old_df:
+----------+-------------------+------------+----------+----------+---------+
|country_cd| consumer_id| person_id|trigger_1 | trigger_2|trigger_4|
+----------+-------------------+------------+----------+----------+---------+
| CZ| 799618| 23899|25-02-2020| null| null|
| CZ| 799625| 23958|07-09-2017| null| null|
| CZ| 799627| 23972|15-02-2018| null| null|
| CZ| 799631| 23984|16-07-2019| null| null|
| CZ| 892599| 23993|30-11-2017| null| null|
| CZ| 799633| 23997| null| null| null|
| CZ| 799634| 24000|24-03-2020| null| null|
| CZ| 799635| 24007|27-09-2018| null| null|
| CZ| 275701| 24023|27-02-2019| null| null|
| CZ| 799637| 24028| null |22-10-2019| null|
| CZ| 799638| 24029|22-10-2019| null | null|
| CZ| 269398| 24033|15-10-2019| null | null|
df_new:
+----------+-------------------+------------+----------+-----------+---------+
|country_cd| consumer_id| person_id|trigger_1 | trigger_2 |trigger_4|
+----------+-------------------+------------+----------+-----------+---------+
| CZ| 799618| 23899| | 15-02-2020| |
| CZ| 799625| 23958| | 03-07-2020| |
| CZ| 799627| 23972|15-02-2020| | |
+----------+-------------------+------------+---------+------------+---------+
输出_df/df_1:
+----------+-------------------+------------+----------+----------+---------+
|country_cd| consumer_id| person_id|trigger_1 | trigger_2|trigger_4|
+----------+-------------------+------------+----------+----------+---------+
| CZ| 799618| 23899|25-02-2020|15-02-2020| null|
| CZ| 799625| 23958|07-09-2017|03-07-2020| null|
| CZ| 799627| 23972|15-02-2020| null| null|
| CZ| 799631| 23984|16-07-2019| null| null|
| CZ| 892599| 23993|30-11-2017| null| null|
| CZ| 799633| 23997| null| null| null|
| CZ| 799634| 24000|24-03-2020| null| null|
| CZ| 799635| 24007|27-09-2018| null| null|
| CZ| 275701| 24023|27-02-2019| null| null|
| CZ| 799637| 24028| null |22-10-2019| null|
| CZ| 799638| 24029|22-10-2019| null | null|
| CZ| 269398| 24033|15-10-2019| null | null|
我尝试了什么:
old_df = spark.createDataFrame(
[
('CZ',799618,23899,'25-02-2020', '','')
('CZ',799625,23958,'07-09-2017', '','')
('CZ',799627,23972,'15-02-2018', '','')
('CZ',799631,23984,'16-07-2019', '','')
('CZ',892599,23993,'30-11-2017', '','')
('CZ',799633,23997, '', '','')
('CZ',799634,24000,'24-03-2020', '','')
('CZ',799635,24007,'27-09-2018', '','')
('CZ',275701,24023,'27-02-2019', '','')
('CZ',799637,24028, '','22-10-2019','')
('CZ',799638,24029,'22-10-2019', '' ,'')
('CZ',269398,24033,'15-10-2019', '' ,'')
]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"])
df.show()
df_new = spark.createDataFrame(
[
('CZ',203001799618,23899,'','3/5/2020',''),
('CZ',203001799625,23958,'','3/7/2020',''),
('CZ',203001799627,23972,'3/15/2020','','')
]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"])
df_new.show()
from pyspark.sql.functions import *
a = df_new.select("consumer_id").distinct().collect()
l = []
for x in a:
l.append(x[0])
print(l)
c = ['CZ']
# include only records with consumer_ids in list l
df_1 = old_df.where(old_df.consumer_id.isin(l) & old_df.country_cd.isin(c))
df_1.withColumn('trigger_1',when(length(df_new.trigger_1)>1,df_new.trigger_1).otherwise(df_1.trigger_1))
df_1 =df_1.withColumn('trigger_2',when(length(df_new.trigger_2)>1,df_new.trigger_2).otherwise(df_1.trigger_2))
df_1 =df_1.withColumn('trigger_4',when(length(df_new.trigger_4)>1,df_new.trigger_4).otherwise(df_1.trigger_4)).show()
【问题讨论】:
【参考方案1】:在连接子句中使用left join
和.contains
和coalesce
以获得所需的输出。
Example:
old_df = spark.createDataFrame(
[
('CZ',799618,23899,'25-02-2020', '',''),
('CZ',799625,23958,'07-09-2017', '',''),
('CZ',799627,23972,'15-02-2018', '',''),
('CZ',799631,23984,'16-07-2019', '',''),
('CZ',892599,23993,'30-11-2017', '',''),
('CZ',799633,23997, '', '',''),
('CZ',799634,24000,'24-03-2020', '',''),
('CZ',799635,24007,'27-09-2018', '',''),
('CZ',275701,24023,'27-02-2019', '',''),
('CZ',799637,24028, '','22-10-2019',''),
('CZ',799638,24029,'22-10-2019', '' ,''),
('CZ',269398,24033,'15-10-2019', '' ,''),
]
,["country_cd","consumer_id","ak_person_id","trigger_1","trigger_2","trigger_4"]).withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None))
old_df.show()
df_new = spark.createDataFrame(
[
('CZ',203001799618,23899,'','3/5/2020',''),
('CZ',203001799625,23958,'','3/7/2020',''),
('CZ',203001799627,23972,'3/15/2020','','')
]
,["country_cd","consumer_id","person_id","trigger_1","trigger_2","trigger_4"]).withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None))
df_new.show()
old_df.alias("t1").join(df_new.alias("t2"),col("t2.consumer_id").contains(col("t1.consumer_id")),"left").\
select(col("t1.country_cd"),col("t1.consumer_id"),col("t1.ak_person_id"),coalesce("t2.trigger_1","t1.trigger_1").alias("trigger_1"),coalesce("t2.trigger_2","t1.trigger_2").alias("trigger_2"),coalesce("t2.trigger_4","t1.trigger_4").alias("trigger_4")).\
withColumn("trigger_1", when(length(trim(col("trigger_1")))>1,col("trigger_1")).otherwise(None)).\
withColumn("trigger_2", when(length(trim(col("trigger_2")))>1,col("trigger_2")).otherwise(None)).\
withColumn("trigger_4", when(length(trim(col("trigger_4")))>1,col("trigger_4")).otherwise(None)).\
show()
#+----------+-----------+------------+----------+----------+---------+
#|country_cd|consumer_id|ak_person_id| trigger_1| trigger_2|trigger_4|
#+----------+-----------+------------+----------+----------+---------+
#| CZ| 799618| 23899|25-02-2020| 3/5/2020| null|
#| CZ| 799625| 23958|07-09-2017| 3/7/2020| null|
#| CZ| 799627| 23972| 3/15/2020| null| null|
#| CZ| 799631| 23984|16-07-2019| null| null|
#| CZ| 892599| 23993|30-11-2017| null| null|
#| CZ| 799633| 23997| null| null| null|
#| CZ| 799634| 24000|24-03-2020| null| null|
#| CZ| 799635| 24007|27-09-2018| null| null|
#| CZ| 275701| 24023|27-02-2019| null| null|
#| CZ| 799637| 24028| null|22-10-2019| null|
#| CZ| 799638| 24029|22-10-2019| null| null|
#| CZ| 269398| 24033|15-10-2019| null| null|
#+----------+-----------+------------+----------+----------+---------+
【讨论】:
以上是关于如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框的主要内容,如果未能解决你的问题,请参考以下文章