在scala中将列从一个数据帧添加到另一个数据帧[重复]

Posted

技术标签:

【中文标题】在scala中将列从一个数据帧添加到另一个数据帧[重复]【英文标题】:add column from one dataframe to another dataframe in scala [duplicate] 【发布时间】:2017-10-31 05:30:52 【问题描述】:

我有两个行数相同的 DataFrame,但列数不同,并且根据来源是动态的。

第一个 DataFrame 包含所有列,但第二个 DataFrame 被过滤和处理,没有其他所有列。

需要从第一个 DataFrame 中选择特定列并与第二个 DataFrame 添加/合并。

val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)

val columnName :String="Col1"

我尝试了几种添加方式,这里我只是给出几个....

val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))

val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))

这些都不起作用。

您能否帮我将列从第一个 DataFrame 合并/添加到第二个 DataFrame。

给出的示例不是我需要的确切数据结构,但它将满足我解决此问题的要求。

样本输入输出:

Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+

Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
|  0 | 111| 1111|
|  0 | 222| 2222|
|  1 | 333| 3333|
|  2 | 444| 4444|
+---+------+---+

Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
|  0|111 | 1111 |1000|
|  0|222 | 2222 |2000|
|  1|333 | 3333 |3000|
|  2|444 | 4444 |4000|
+---+------+---+---+

【问题讨论】:

【参考方案1】:

使用join实现此目的的一种方法

如果您在两个数据框中都有一些共同的列,那么您可以对该列执行连接并获得您想要的结果。

示例:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")

val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()

输出:

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   Anu| 23|
|  2|Suresh| 24|
|  3|  Usha| 24|
|  4| Nisha| 25|
+---+------+---+

更新:

如果您在两个数据帧中没有任何共同的唯一 ID,则创建一个并使用它。

import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._

var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf  = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")

sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())

val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()

输出:

+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
|    1000|   0|    111| 1111|
|    2000|   0|    222| 2222|
|    3000|   1|    333| 3333|
|    4000|   2|    444| 4444|
+--------+----+-------+-----+

【讨论】:

请注意,这并不总是有效(尽管它适用于小型数据帧)。 monotonically_increasing_id 只保证数字在增加,不保证使用哪些数字。因此,赋予两个数据帧的数字可能不同。 @Shaido 是的,完全同意 在第一个示例中,如果 .as("df1") 已经是数据框的名称,为什么还要这样做? 它是给df1一个别名,以便我可以在我的select操作中使用这个别名 ***.com/questions/47894877/…

以上是关于在scala中将列从一个数据帧添加到另一个数据帧[重复]的主要内容,如果未能解决你的问题,请参考以下文章

Pandas 基于连接将列从一个数据帧添加到另一个数据帧

根据几个条件将列从一个数据帧映射到另一个数据帧,以考虑存在的多个映射中的一个映射

将列从一个数据帧合并到另一个数据帧(left_join不起作用) - rstudio

如何向 pandas df 添加一个新列,该列从另一个数据帧返回同一组中更大的最小值

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧