使用来自另一个数据集的值搜索和更新 Spark 数据集列

Posted

技术标签:

【中文标题】使用来自另一个数据集的值搜索和更新 Spark 数据集列【英文标题】:Searching and updating a Spark Dataset column with values from another Dataset 【发布时间】:2020-01-24 15:30:36 【问题描述】:

Java 8 和 Spark 2.11:2.3.2 在这里。尽管我更喜欢 Java API 答案,但我确实会讲一点 Scala,所以我将能够理解其中提供的任何答案!但是如果可能的话,Java(请)!

我有两个具有不同架构的数据集,例外是一个常见的“model_number”(字符串)列:两者都存在。

对于我的第一个数据集中的每一行(我们称之为d1),我需要扫描/搜索第二个数据集(“d2”)以查看是否有一行具有相同的model_number ,如果是,请更新另一个 d2 列。

这是我的数据集架构:

d1
===========
model_number : string
desc : string
fizz : string
buzz : date

d2
===========
model_number : string
price : double
source : string

同样,如果d1 行有一个model_number,比如12345,并且d2 行也有相同的model_number,我想通过将d2.price 乘以@ 来更新它987654332@.

迄今为止我最好的尝试:

// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));

// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);

谁能帮我越过终点线?提前致谢!

【问题讨论】:

join 是您正在寻找的。如果您遇到任何问题,请尝试并返回。 感谢@VamsiPrabhala (+1) 我查看了join 文档并一起“科学地”提出了一个潜在的解决方案(请参阅我的更新!)。我想我现在更接近了,但仍然无法透过树木看到森林!感谢您在这里提供的所有帮助! 【参考方案1】:

这里有几点,正如评论中提到的@VamsiPrabhala,您需要在特定字段上使用join 的功能。关于“update”,你需要记住spark中的dfdsrdd是不可变的,所以你不能update他们。所以,这里的解决方案是,在join 你的df 之后,你需要在select 或使用withColumn 然后select 中执行你的计算,在这种情况下是乘法。换句话说,您不能更新该列,但您可以使用“new”列创建新的df

例子:

Input data:

+------------+------+------+----+
|model_number|  desc|  fizz|buzz|
+------------+------+------+----+
|     model_a|desc_a|fizz_a|null|
|     model_b|desc_b|fizz_b|null|
+------------+------+------+----+

+------------+-----+--------+
|model_number|price|  source|
+------------+-----+--------+
|     model_a| 10.0|source_a|
|     model_b| 20.0|source_b|
+------------+-----+--------+

使用join 将输出:

val joinedDF = d1.join(d2, "model_number")
joinedDF.show()

+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 10.0|source_a|
|     model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+

应用您的计算:

joinedDF.withColumn("price", col("price") * 10).show()

output:
+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 100.0|source_a|
|     model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+

【讨论】:

谢谢@Caesar (+1) 但是有一个问题要问你:你说 RDD/DF/DS 是不可变的 并且不能更新。但是joinedDF.withColumn("price", col("price") * 2) 不是在更新joinedDF 上的现有 price 列吗? 哦,我明白了,withColumn 返回一个新的 DF,而恰好新的 DF 也有一个 price 列,对吧?如果 是真的,那么我唯一的问题是:col("price") 是否知道将“价格”从joinedDF 中拉下来,或者我真的应该对我正在谈论的数据集进行限定,例如:@ 987654346@? 关于您的第一个问题,正如您已经提到的,它返回一个新的 dfprice 列,现在具有新值。关于您的第二个问题,使用 col("price") 您隐含地告诉 spark 您想在 joinedDF 中使用列 price 。所有列名(在本例中为“价格”)必须存在于您的 joinedDF 中。

以上是关于使用来自另一个数据集的值搜索和更新 Spark 数据集列的主要内容,如果未能解决你的问题,请参考以下文章

Spark和Scala,通过映射公用键添加具有来自另一个数据帧的值的新列[重复]

使用来自另一个数据帧的值更新数据帧标头

PySpark 更新某些列的值

使用空数据集的Spark SQL连接会导致更大的输出文件大小

使用来自另一个 MEF 程序集的类而不引用它

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串