如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?
Posted
技术标签:
【中文标题】如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?【英文标题】:How to create a new column in a Spark DataFrame based on a second DataFrame (Java)? 【发布时间】:2016-05-26 22:15:46 【问题描述】:我有两个 Spark DataFrame,其中一个有两个 cols,id 和 Tag。第二个 DataFrame 有一个 id col,但缺少 Tag。第一个 Dataframe 本质上是一个字典,每个 id 出现一次,而在第二个 DataFrame 中,id 可能出现多次。我需要的是在第二个 DataFrame 中创建一个新的 col,它的 Tag 作为每行中 id 的函数(在第二个 DataFrame 中)。我认为这可以通过首先转换为 RDD ..etc 来完成,但我认为必须有一种更优雅的方式使用 DataFrames(在 Java 中)。示例:给定一个 df1 Row-> id: 0, Tag: "A", a df2 Row1-> id: 0, Tag: null, a df2 Row2-> id: 0, Tag: "B",我需要在结果 DataFrame df3 中创建一个 Tag col 等于 df1(id=0) = "A" IF df2 Tag 为空,但保持原始如果不为空则标记 => 导致 df3 Row1-> id: 0, Tag: "A", df3 Row2-> id: 0, Tag: "B"。希望这个例子很清楚。
| ID | No. | Tag | new Tag Col |
| 1 | 10002 | A | A |
| 2 | 10003 | B | B |
| 1 | 10004 | null | A |
| 2 | 10005 | null | B |
【问题讨论】:
简单的LEFT OUTER JOIN
不适合你的任何原因?
我编辑了这个问题并将调查 LOJ..
LOJ 并没有完全解决它,但我认为如果我使用 udf() 跟随它,那么我会得到我需要的东西。谢谢,
如果还不够,您可以尝试使用示例输入和预期输出来更新问题。现在真的很难理解你想要什么。
当 Tag 为 null 时,new Tag = Tag(id)。例如。 Tag(id=1) = A,所以我们将 A 分配给 10004,将 Tag(id=2) = B 分配给 10005。我想我需要一个 udf()。
【参考方案1】:
这里只需要左外连接和coalesce
:
import org.apache.spark.sql.functions.coalesce
val df = sc.parallelize(Seq(
(1, 10002, Some("A")), (2, 10003, Some("B")),
(1, 10004, None), (2, 10005, None)
)).toDF("id", "no", "tag")
val lookup = sc.parallelize(Seq(
(1, "A"), (2, "B")
)).toDF("id", "tag")
df.join(lookup, df.col("id").equalTo(lookup.col("id")), "leftouter")
.withColumn("new_tag", coalesce(df.col("tag"), lookup.col("tag")))
这应该与 Java 版本几乎相同。
【讨论】:
这在 Java 中不起作用。它期待 Seq以上是关于如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?的主要内容,如果未能解决你的问题,请参考以下文章
Pandas:如何在第二个 DataFrame 的另一列中查找子字符串位置
PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值
有没有办法用第二个 Dataframe 中的一列填充一个 Dataframe 中的一列?