在 Spark 中合并数据框

Posted

技术标签:

【中文标题】在 Spark 中合并数据框【英文标题】:Merging Dataframes in Spark 【发布时间】:2016-08-01 19:28:42 【问题描述】:

我有 2 个数据框,比如 A 和 B。我想将它们加入一个关键列并创建另一个数据框。当 A 和 B 中的键匹配时,我需要从 B 中获取行,而不是从 A 中获取。

例如:

数据帧 A:

Employee1, salary100
Employee2, salary50
Employee3, salary200

数据帧 B

Employee1, salary150
Employee2, salary100
Employee4, salary300

生成的 DataFrame 应该是:

数据帧 C:

Employee1, salary150
Employee2, salary100
Employee3, salary200
Employee4, salary300

如何在 Spark 和 Scala 中做到这一点?

【问题讨论】:

您已添加 Spark SQL 标记。如果你懂 SQL,这应该是一个简单的JOIN 操作 我不擅长 SQL。你能分享一下这个简单的JOIN操作吗?那会很有帮助。 【参考方案1】:

试试:

dfA.registerTempTable("dfA")
dfB.registerTempTable("dfB")

sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee), 
       coalesce(dfB.salary, dfA.salary) FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")

sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
  CASE dfB.employee IS NOT NULL THEN dfB.salary
  CASE dfB.employee IS NOT NULL THEN dfA.salary
  END FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")

【讨论】:

根据我对“右外连接”的理解,在这种情况下它不会选择“Employee3”,对吧? 我喜欢这个的是我可以动态构建'sql'。我是否需要添加“别名”,例如,coalesce(dfB.salary, dfA.salary).alias("salary")?无论如何,我会尝试这个,如果它有效,会给你一个赞成票。谢谢。 @DilTeam 只有在以后需要引用结果表时,才需要在数据帧 API 中使用别名。这纯粹是一个 SQL 解决方案。 其实这个逻辑是不是有bug?假设“工资”值从 100 更改为 NULL。所以在 dfA 中它是 100 而 dfB 它是 NULL。结果值为 100 - 这是不正确的。它已更改为 NULL。我错过了什么吗? 以下是我认为我们可以解决此问题的方法:IF(ISNULL(dfB.employee), dfA.salary, dfB.salary) 而不是 'coalesce'。对吗?【参考方案2】:

假设 dfA 和 dfB 有 2 列 emp 和 sal。您可以使用以下内容:

import org.apache.spark.sql.functions => f

val dfB1 = dfB
  .withColumnRenamed("sal", "salB")
  .withColumnRenamed("emp", "empB")

val joined = dfA
  .join(dfB1, 'emp === 'empB, "outer")
  .select(
    f.coalesce('empB, 'emp).as("emp"),
    f.coalesce('salB, 'sal).as("sal")
  )

注意:对于 emp 的给定值,每个 Dataframe 应该只有一行

【讨论】:

这是一个很好的答案。但我需要动态构建“sql”,所以我认为 LostInOverflow 的答案会更好。如果您不同意,请告诉我,但感谢您的回答。 @DilTeam 您可以像构建 SQL 一样动态构建 Scala DSL。通常,使用 DSL 会更简单,因为 OO 原则使重用变得更加容易。另外请注意,您可以使用 functions.expr(anySqlExpression) 从 SQL 表达式的文本中创建 Column 对象。 @Marl 代码有一个错误:coalesce 采用 Column*,而不是列名。我修好了。 我对如何生成 f.coalesce('empB, 'emp).as("emp"), f.coalesce('salB, 'sal).as("sal ") 动态。感觉很笨:(有什么指示吗?

以上是关于在 Spark 中合并数据框的主要内容,如果未能解决你的问题,请参考以下文章

将新行与spark scala中数据框中的前一行数据合并

如何在火花中合并或连接具有不相等列号的数据框

如何在JAVA中加入没有重复列的Spark数据框

如何在JAVA中加入没有重复列的Spark数据框

如何将列表数组合并到单列中并使其适合现有的数据框?

Spark中的拆分,操作和联合数据框