PySpark:如果一个条件从两个条件实现,则合并两个数据帧
Posted
技术标签:
【中文标题】PySpark:如果一个条件从两个条件实现,则合并两个数据帧【英文标题】:PySpark: Merging two dataframes if one condition achieved from a two conditions 【发布时间】:2018-02-13 03:30:44 【问题描述】:问:如果一个条件由两个条件实现,有什么方法可以合并两个数据帧?
例如,我有两个数据框:
DF1
name Exam
Ahmad 100
Ahmad 95
Ahmad 90
Emma 80
Emma 85
第二个数据帧
DF2
name math phy. prev._Rank
Ahmad 100 90 2
Emma 80 85 1
我想要得到的 DF 如下:
NEW DF
name Exam math phy. Prev._Rank
Ahmad 100 100 90 2
Ahmad 95 null null 2
Ahmad 90 100 90 2
Emma 80 80 85 1
Emma 85 80 85 1
【问题讨论】:
它看起来像左加入,但显然你搞砸了预期的结果,所以我没有看到任何逻辑...... 您能否添加更多关于您想要实现的目标的详细信息 如果数学或物理有价值,我想加入两个 DF。列与同一学生的考试列中的值匹配,保留其他值在 DF2 中不存在,如果可以为同一学生提供 prev._rank,如第 2 行所示,它会更好。 【参考方案1】:DF1=spark.createDataFrame([('Ahmad','100'),('Ahmad','95'),('Ahmad','90'),('Ahmad','50'),('Ahmad','51'),('Ahmad','54'),('Ahmad','53'),('Emma','52'),('Emma','85')],['namea','Exam'])
DF1=DF1.select('namea',DF1.Exam.cast('integer'))
DF1.show()
+-----+----+
|namea|Exam|
+-----+----+
|Ahmad| 100|
|Ahmad| 95|
|Ahmad| 90|
|Ahmad| 50|
|Ahmad| 51|
|Ahmad| 54|
|Ahmad| 53|
| Emma| 52|
| Emma| 85|
+-----+----+
DF2=spark.createDataFrame([('Ahmad','100','90','2'),('Ahmad','50','54','3'),('Emma','52','85','1')],['name','math','phy','Prev_Rank'])
DF2=DF2.select('name',DF2.math.cast('integer'),DF2.phy.cast('integer'),'Prev_Rank')
DF2.show()
+-----+----+---+---------+
| name|math|phy|Prev_Rank|
+-----+----+---+---------+
|Ahmad| 100| 90| 2|
|Ahmad| 50| 54| 3|
| Emma| 52| 85| 1|
+-----+----+---+---------+
解决方案
DF3=DF1.join(DF2,[DF1.namea==DF2.name,DF1.Exam==DF2.math],'leftouter')
DF3.show()
+-----+----+-----+----+----+---------+
|namea|Exam| name|math| phy|Prev_Rank|
+-----+----+-----+----+----+---------+
|Ahmad| 90| null|null|null| null|
| Emma| 85| null|null|null| null|
|Ahmad| 50|Ahmad| 50| 54| 3|
|Ahmad| 53| null|null|null| null|
|Ahmad| 54| null|null|null| null|
| Emma| 52| Emma| 52| 85| 1|
|Ahmad| 95| null|null|null| null|
|Ahmad| 100|Ahmad| 100| 90| 2|
|Ahmad| 51| null|null|null| null|
+-----+----+-----+----+----+---------+
DF4=DF1.join(DF2,[DF1.namea==DF2.name,DF1.Exam==DF2.phy],'leftouter').withColumnRenamed('name','name1').withColumnRenamed('math','math1').withColumnRenamed('phy','phy1').withColumnRenamed('Prev_Rank','Prev_Rank1')
DF4.show()
+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad| 90|Ahmad| 100| 90| 2|
| Emma| 85| Emma| 52| 85| 1|
|Ahmad| 50| null| null|null| null|
|Ahmad| 53| null| null|null| null|
|Ahmad| 54|Ahmad| 50| 54| 3|
| Emma| 52| null| null|null| null|
|Ahmad| 95| null| null|null| null|
|Ahmad| 100| null| null|null| null|
|Ahmad| 51| null| null|null| null|
+-----+----+-----+-----+----+----------+
DF5=DF4.join(DF3,['namea','Exam'],'inner').orderBy(['namea','Exam'])
DF5.show()
+-----+----+-----+-----+----+----------+-----+----+----+---------+
|namea|Exam|name1|math1|phy1|Prev_Rank1| name|math| phy|Prev_Rank|
+-----+----+-----+-----+----+----------+-----+----+----+---------+
|Ahmad| 50| null| null|null| null|Ahmad| 50| 54| 3|
|Ahmad| 51| null| null|null| null| null|null|null| null|
|Ahmad| 53| null| null|null| null| null|null|null| null|
|Ahmad| 54|Ahmad| 50| 54| 3| null|null|null| null|
|Ahmad| 90|Ahmad| 100| 90| 2| null|null|null| null|
|Ahmad| 95| null| null|null| null| null|null|null| null|
|Ahmad| 100| null| null|null| null|Ahmad| 100| 90| 2|
| Emma| 52| null| null|null| null| Emma| 52| 85| 1|
| Emma| 85| Emma| 52| 85| 1| null|null|null| null|
+-----+----+-----+-----+----+----------+-----+----+----+---------+
DF6=DF5.withColumn("name1",coalesce(DF5.name1,DF5.name)).withColumn("math1",coalesce(DF5.math1,DF5.math)).withColumn("phy1",coalesce(DF5.phy1,DF5.phy)).withColumn("Prev_Rank1",coalesce(DF5.Prev_Rank1,DF5.Prev_Rank)).drop('name','math','phy','Prev_Rank')
DF6.show()
+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad| 50|Ahmad| 50| 54| 3|
|Ahmad| 51| null| null|null| null|
|Ahmad| 53| null| null|null| null|
|Ahmad| 54|Ahmad| 50| 54| 3|
|Ahmad| 90|Ahmad| 100| 90| 2|
|Ahmad| 95| null| null|null| null|
|Ahmad| 100|Ahmad| 100| 90| 2|
| Emma| 52| Emma| 52| 85| 1|
| Emma| 85| Emma| 52| 85| 1|
+-----+----+-----+-----+----+----------+
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as fn
DF7=DF6.withColumn("Prev_Rank1", fn.last('Prev_Rank1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0))).drop('name1')
DF7.show()
+-----+----+-----+----+----------+
|namea|Exam|math1|phy1|Prev_Rank1|
+-----+----+-----+----+----------+
|Ahmad| 50| 50| 54| 3|
|Ahmad| 51| null|null| 3|
|Ahmad| 53| null|null| 3|
|Ahmad| 54| 50| 54| 3|
|Ahmad| 90| 100| 90| 2|
|Ahmad| 95| null|null| 2|
|Ahmad| 100| 100| 90| 2|
| Emma| 52| 52| 85| 1|
| Emma| 85| 52| 85| 1|
+-----+----+-----+----+----------+
【讨论】:
以上是关于PySpark:如果一个条件从两个条件实现,则合并两个数据帧的主要内容,如果未能解决你的问题,请参考以下文章