Spark 以非连接列上的条件连接
Posted
技术标签:
【中文标题】Spark 以非连接列上的条件连接【英文标题】:Spark joins with condition on non join column 【发布时间】:2019-04-17 18:55:38 【问题描述】:我有以下两个数据框,我想根据 col A 加入它们
df1:
+------+--------+-------+
| A | B | C |
+------+--------+-------+
| a1 | 5 | asd |
| a2 | 12 | asd |
+------+--------+-------+
df2:
+------+--------+-------+
| A | B | D |
+------+--------+-------+
| a1 | 8 | qwe |
| a2 | 10 | qwe |
+------+--------+-------+
由于 B 列相同,我们假设在两者之间进行选择是有逻辑的,例如选择
+------+--------+------+-----+
| A | B | C | D |
+------+--------+------+-----+
| a1 | 8 | asd | qwe |
| a2 | 12 | asd | qwe |
+------+--------+-------+----+
实现此目的的简单方法是:
val _df1 = df1.withColumnRenamed("B","B_df1")
val _df2 = df2.withColumnRenamed("B", "B_df2)
_df1.join(_df2, Seq("A"))
.withColumn("B", when(col("B_df1") > col("B_df2"),
col("B_df1"))
.otherwise(col("B_df2"))
.drop(col("B_df1")
.drop("B_df2")
有没有更好的方法可以在不重命名和删除列的情况下实现这一点?
【问题讨论】:
【参考方案1】:这是使用selectExpr
的另一种方法。它节省了一些删除列的努力。
import spark.implicits._
val df1 = Seq(("a1",5,"asd"),
("a2",12,"asd")
).toDF("A","B","C")
val df2 = Seq(("a1",8,"qwe"),
("a2",10,"qwe")
).toDF("A","B","D")
import org.apache.spark.sql.functions.col
df1.as("a").join(df2.as("b"), col("a.A") === col("b.A")).selectExpr("a.A AS A",
"CASE WHEN a.B>b.B THEN a.B ELSE b.B END AS B",
"a.C AS C",
"b.D AS D").show()
【讨论】:
以上是关于Spark 以非连接列上的条件连接的主要内容,如果未能解决你的问题,请参考以下文章