在 Spark 中合并数据框
Posted
技术标签:
【中文标题】在 Spark 中合并数据框【英文标题】:Merging Dataframes in Spark 【发布时间】:2016-08-01 19:28:42 【问题描述】:我有 2 个数据框,比如 A 和 B。我想将它们加入一个关键列并创建另一个数据框。当 A 和 B 中的键匹配时,我需要从 B 中获取行,而不是从 A 中获取。
例如:
数据帧 A:
Employee1, salary100
Employee2, salary50
Employee3, salary200
数据帧 B
Employee1, salary150
Employee2, salary100
Employee4, salary300
生成的 DataFrame 应该是:
数据帧 C:
Employee1, salary150
Employee2, salary100
Employee3, salary200
Employee4, salary300
如何在 Spark 和 Scala 中做到这一点?
【问题讨论】:
您已添加 Spark SQL 标记。如果你懂 SQL,这应该是一个简单的JOIN
操作
我不擅长 SQL。你能分享一下这个简单的JOIN操作吗?那会很有帮助。
【参考方案1】:
试试:
dfA.registerTempTable("dfA")
dfB.registerTempTable("dfB")
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
coalesce(dfB.salary, dfA.salary) FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
或
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
CASE dfB.employee IS NOT NULL THEN dfB.salary
CASE dfB.employee IS NOT NULL THEN dfA.salary
END FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
【讨论】:
根据我对“右外连接”的理解,在这种情况下它不会选择“Employee3”,对吧? 我喜欢这个的是我可以动态构建'sql'。我是否需要添加“别名”,例如,coalesce(dfB.salary, dfA.salary).alias("salary")?无论如何,我会尝试这个,如果它有效,会给你一个赞成票。谢谢。 @DilTeam 只有在以后需要引用结果表时,才需要在数据帧 API 中使用别名。这纯粹是一个 SQL 解决方案。 其实这个逻辑是不是有bug?假设“工资”值从 100 更改为 NULL。所以在 dfA 中它是 100 而 dfB 它是 NULL。结果值为 100 - 这是不正确的。它已更改为 NULL。我错过了什么吗? 以下是我认为我们可以解决此问题的方法:IF(ISNULL(dfB.employee), dfA.salary, dfB.salary) 而不是 'coalesce'。对吗?【参考方案2】:假设 dfA 和 dfB 有 2 列 emp 和 sal。您可以使用以下内容:
import org.apache.spark.sql.functions => f
val dfB1 = dfB
.withColumnRenamed("sal", "salB")
.withColumnRenamed("emp", "empB")
val joined = dfA
.join(dfB1, 'emp === 'empB, "outer")
.select(
f.coalesce('empB, 'emp).as("emp"),
f.coalesce('salB, 'sal).as("sal")
)
注意:对于 emp 的给定值,每个 Dataframe 应该只有一行
【讨论】:
这是一个很好的答案。但我需要动态构建“sql”,所以我认为 LostInOverflow 的答案会更好。如果您不同意,请告诉我,但感谢您的回答。 @DilTeam 您可以像构建 SQL 一样动态构建 Scala DSL。通常,使用 DSL 会更简单,因为 OO 原则使重用变得更加容易。另外请注意,您可以使用functions.expr(anySqlExpression)
从 SQL 表达式的文本中创建 Column
对象。
@Marl 代码有一个错误:coalesce
采用 Column*
,而不是列名。我修好了。
我对如何生成 f.coalesce('empB, 'emp).as("emp"), f.coalesce('salB, 'sal).as("sal ") 动态。感觉很笨:(有什么指示吗?以上是关于在 Spark 中合并数据框的主要内容,如果未能解决你的问题,请参考以下文章