如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?

Posted

技术标签:

【中文标题】如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?【英文标题】:How Do you Update a Dataset<Row> with records from another Dataset<Row> which have Identical Schema in Spark with JAVA API? 【发布时间】:2017-06-07 22:41:08 【问题描述】:

假设您有一个包含以下记录的数据集 A:

Dataset A:
    key1, val1
    key2, val2
    key3, val3

Dataset B:
    key4, val4
    key1, valBB
    key5, valN
    key2, NNNNN

“更新”发生后,最终数据集应如下所示:

Dataset Final:
    key1, valBB
    key2, NNNNN
    key3, val3
    key4, val4
    key5, valN

到目前为止我采取的方法是将两个Dataset转换为JavaRDD,然后转换JavaRDD -> JavaPairRDD,然后是firstPairRDD.subtractByKey(secondPairRDD)。这给了我存在于数据集 A 但不存在于数据集 B 中的记录。然后我将其重新转换回数据集。下一步是我使用 DatasetB 进行 Union 以提供更新的数据集。对我来说,这并没有给我预期的结果。我采取了错误的方法吗?任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

我最终找到了一个更有效的解决方案:

    Dataset<Row> existsInAButNotB = A.join(B, A.col("key").equalTo(B.col("key") "left_anti");
    Dataset<Row> Final = existsInAButNotB.union(B); 

如果您有多个列用作密钥,那么您的解决方案应如下所示:

Dataset<Row> existsInAButNotB = A.join(B, A.col("key1").equalTo(B.col("key1").and(A.col("key2").equalTo(B.col("key2")) "left_anti");

这一行避免用户进入低效的 RDD 世界,避免添加额外的代码。

看看这个:

Left Anti join in Spark?

更多关于左反加入这里:

what is the difference between an anti-join and an anti semi join?

数据集连接 API: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#join(org.apache.spark.sql.Dataset,%20org.apache.spark.sql.Column,%20java.lang.String)

【讨论】:

以上是关于如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?的主要内容,如果未能解决你的问题,请参考以下文章

Java-Spark:如何在循环中迭代时获取 Dataset<Row> 列的值并在 when().otherwise() 中使用它?

如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列

javaAPI 中是不是有将 Dataset<Row> 转换为 map() 并返回 Dataset<Row>?

将 Json 的 Dataset 列解析为 Dataset<Row>

在 Apache Spark Dataset<Row> 上应用 flatMap 操作时出现意外的编码器行为

Scala - 如何将 Dataset[Row] 转换为可添加到 Dataframe 的列