如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?
Posted
技术标签:
【中文标题】如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?【英文标题】:How Do you Update a Dataset<Row> with records from another Dataset<Row> which have Identical Schema in Spark with JAVA API? 【发布时间】:2017-06-07 22:41:08 【问题描述】:假设您有一个包含以下记录的数据集 A:
Dataset A:
key1, val1
key2, val2
key3, val3
Dataset B:
key4, val4
key1, valBB
key5, valN
key2, NNNNN
“更新”发生后,最终数据集应如下所示:
Dataset Final:
key1, valBB
key2, NNNNN
key3, val3
key4, val4
key5, valN
到目前为止我采取的方法是将两个Dataset转换为JavaRDD,然后转换JavaRDD -> JavaPairRDD,然后是firstPairRDD.subtractByKey(secondPairRDD)。这给了我存在于数据集 A 但不存在于数据集 B 中的记录。然后我将其重新转换回数据集。下一步是我使用 DatasetB 进行 Union 以提供更新的数据集。对我来说,这并没有给我预期的结果。我采取了错误的方法吗?任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:我最终找到了一个更有效的解决方案:
Dataset<Row> existsInAButNotB = A.join(B, A.col("key").equalTo(B.col("key") "left_anti");
Dataset<Row> Final = existsInAButNotB.union(B);
如果您有多个列用作密钥,那么您的解决方案应如下所示:
Dataset<Row> existsInAButNotB = A.join(B, A.col("key1").equalTo(B.col("key1").and(A.col("key2").equalTo(B.col("key2")) "left_anti");
这一行避免用户进入低效的 RDD 世界,避免添加额外的代码。
看看这个:
Left Anti join in Spark?
更多关于左反加入这里:
what is the difference between an anti-join and an anti semi join?
数据集连接 API: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#join(org.apache.spark.sql.Dataset,%20org.apache.spark.sql.Column,%20java.lang.String)
【讨论】:
以上是关于如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?的主要内容,如果未能解决你的问题,请参考以下文章
Java-Spark:如何在循环中迭代时获取 Dataset<Row> 列的值并在 when().otherwise() 中使用它?
如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列
javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?
将 Json 的 Dataset 列解析为 Dataset<Row>