PySpark:如果一个条件从两个条件实现,则合并两个数据帧

Posted

技术标签:

【中文标题】PySpark:如果一个条件从两个条件实现,则合并两个数据帧【英文标题】:PySpark: Merging two dataframes if one condition achieved from a two conditions 【发布时间】:2018-02-13 03:30:44 【问题描述】:

问:如果一个条件由两个条件实现,有什么方法可以合并两个数据帧?

例如,我有两个数据框:

DF1

 name           Exam
 Ahmad          100
 Ahmad          95
 Ahmad          90
 Emma           80
 Emma           85

第二个数据帧

DF2

 name       math     phy.   prev._Rank
 Ahmad      100      90     2
 Emma       80       85     1

我想要得到的 DF 如下:

NEW DF

name     Exam      math     phy.    Prev._Rank
 Ahmad   100       100      90      2
 Ahmad   95        null     null    2
 Ahmad   90        100      90      2
 Emma    80        80       85      1
 Emma    85        80       85      1   

【问题讨论】:

它看起来像左加入,但显然你搞砸了预期的结果,所以我没有看到任何逻辑...... 您能否添加更多关于您想要实现的目标的详细信息 如果数学或物理有价值,我想加入两个 DF。列与同一学生的考试列中的值匹配,保留其他值在 DF2 中不存在,如果可以为同一学生提供 prev._rank,如第 2 行所示,它会更好。 【参考方案1】:
DF1=spark.createDataFrame([('Ahmad','100'),('Ahmad','95'),('Ahmad','90'),('Ahmad','50'),('Ahmad','51'),('Ahmad','54'),('Ahmad','53'),('Emma','52'),('Emma','85')],['namea','Exam'])

DF1=DF1.select('namea',DF1.Exam.cast('integer'))




DF1.show()

+-----+----+
|namea|Exam|
+-----+----+
|Ahmad| 100|
|Ahmad|  95|
|Ahmad|  90|
|Ahmad|  50|
|Ahmad|  51|
|Ahmad|  54|
|Ahmad|  53|
| Emma|  52|
| Emma|  85|
+-----+----+

DF2=spark.createDataFrame([('Ahmad','100','90','2'),('Ahmad','50','54','3'),('Emma','52','85','1')],['name','math','phy','Prev_Rank'])
DF2=DF2.select('name',DF2.math.cast('integer'),DF2.phy.cast('integer'),'Prev_Rank')


DF2.show()

+-----+----+---+---------+
| name|math|phy|Prev_Rank|
+-----+----+---+---------+
|Ahmad| 100| 90|        2|
|Ahmad|  50| 54|        3|
| Emma|  52| 85|        1|
+-----+----+---+---------+

解决方案

DF3=DF1.join(DF2,[DF1.namea==DF2.name,DF1.Exam==DF2.math],'leftouter')

DF3.show()

+-----+----+-----+----+----+---------+
|namea|Exam| name|math| phy|Prev_Rank|
+-----+----+-----+----+----+---------+
|Ahmad|  90| null|null|null|     null|
| Emma|  85| null|null|null|     null|
|Ahmad|  50|Ahmad|  50|  54|        3|
|Ahmad|  53| null|null|null|     null|
|Ahmad|  54| null|null|null|     null|
| Emma|  52| Emma|  52|  85|        1|
|Ahmad|  95| null|null|null|     null|
|Ahmad| 100|Ahmad| 100|  90|        2|
|Ahmad|  51| null|null|null|     null|
+-----+----+-----+----+----+---------+

DF4=DF1.join(DF2,[DF1.namea==DF2.name,DF1.Exam==DF2.phy],'leftouter').withColumnRenamed('name','name1').withColumnRenamed('math','math1').withColumnRenamed('phy','phy1').withColumnRenamed('Prev_Rank','Prev_Rank1')

DF4.show()

+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad|  90|Ahmad|  100|  90|         2|
| Emma|  85| Emma|   52|  85|         1|
|Ahmad|  50| null| null|null|      null|
|Ahmad|  53| null| null|null|      null|
|Ahmad|  54|Ahmad|   50|  54|         3|
| Emma|  52| null| null|null|      null|
|Ahmad|  95| null| null|null|      null|
|Ahmad| 100| null| null|null|      null|
|Ahmad|  51| null| null|null|      null|
+-----+----+-----+-----+----+----------+

DF5=DF4.join(DF3,['namea','Exam'],'inner').orderBy(['namea','Exam'])

DF5.show()

+-----+----+-----+-----+----+----------+-----+----+----+---------+
|namea|Exam|name1|math1|phy1|Prev_Rank1| name|math| phy|Prev_Rank|
+-----+----+-----+-----+----+----------+-----+----+----+---------+
|Ahmad|  50| null| null|null|      null|Ahmad|  50|  54|        3|
|Ahmad|  51| null| null|null|      null| null|null|null|     null|
|Ahmad|  53| null| null|null|      null| null|null|null|     null|
|Ahmad|  54|Ahmad|   50|  54|         3| null|null|null|     null|
|Ahmad|  90|Ahmad|  100|  90|         2| null|null|null|     null|
|Ahmad|  95| null| null|null|      null| null|null|null|     null|
|Ahmad| 100| null| null|null|      null|Ahmad| 100|  90|        2|
| Emma|  52| null| null|null|      null| Emma|  52|  85|        1|
| Emma|  85| Emma|   52|  85|         1| null|null|null|     null|
+-----+----+-----+-----+----+----------+-----+----+----+---------+

DF6=DF5.withColumn("name1",coalesce(DF5.name1,DF5.name)).withColumn("math1",coalesce(DF5.math1,DF5.math)).withColumn("phy1",coalesce(DF5.phy1,DF5.phy)).withColumn("Prev_Rank1",coalesce(DF5.Prev_Rank1,DF5.Prev_Rank)).drop('name','math','phy','Prev_Rank')

DF6.show()

+-----+----+-----+-----+----+----------+
|namea|Exam|name1|math1|phy1|Prev_Rank1|
+-----+----+-----+-----+----+----------+
|Ahmad|  50|Ahmad|   50|  54|         3|
|Ahmad|  51| null| null|null|      null|
|Ahmad|  53| null| null|null|      null|
|Ahmad|  54|Ahmad|   50|  54|         3|
|Ahmad|  90|Ahmad|  100|  90|         2|
|Ahmad|  95| null| null|null|      null|
|Ahmad| 100|Ahmad|  100|  90|         2|
| Emma|  52| Emma|   52|  85|         1|
| Emma|  85| Emma|   52|  85|         1|
+-----+----+-----+-----+----+----------+

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as fn
​

DF7=DF6.withColumn("Prev_Rank1", fn.last('Prev_Rank1', True).over(Window.partitionBy('namea').orderBy('Exam').rowsBetween(-sys.maxsize, 0))).drop('name1')


DF7.show()

+-----+----+-----+----+----------+
|namea|Exam|math1|phy1|Prev_Rank1|
+-----+----+-----+----+----------+
|Ahmad|  50|   50|  54|         3|
|Ahmad|  51| null|null|         3|
|Ahmad|  53| null|null|         3|
|Ahmad|  54|   50|  54|         3|
|Ahmad|  90|  100|  90|         2|
|Ahmad|  95| null|null|         2|
|Ahmad| 100|  100|  90|         2|
| Emma|  52|   52|  85|         1|
| Emma|  85|   52|  85|         1|
+-----+----+-----+----+----------+

【讨论】:

以上是关于PySpark:如果一个条件从两个条件实现,则合并两个数据帧的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 正则表达式以两个条件提取字符串

使用pyspark中的条件创建具有运行总量的列

根据R中的条件合并列

有条件的 PySpark 窗口

如果两个条件为真,则 php 回显

计算 Pyspark 中发生条件时两个事件之间的月数