在 Spark Java API 中加入行数据集

Posted

技术标签:

【中文标题】在 Spark Java API 中加入行数据集【英文标题】:Join Row DataSets In Spark Java API 【发布时间】:2018-05-03 18:29:40 【问题描述】:

我想要两个连接两个数据集 DS1 和 DS2 得到 DS3

DS1:

+---------+--------------------+-----------+------------+
|Compte   |         Lib        |ReportDebit|ReportCredit|
+---------+--------------------+-----------+------------+
|   447105|Autres impôts, ta...|    77171.0|         0.0|
|   753000|Jetons de présenc...|     6839.0|         0.0|
|   511107|Valeurs à l’encai...|        0.0|     77171.0|
+---------+--------------------+-----------+------------+

DS2:

+---------+------------+
|Compte   |SoldeBalance|
+---------+------------+
| 447105  |      992.13|
| 111111  |     35065.0|

我想得到这样的 DS3:

+---------+--------------------+-----------+------------+------------+
|Compte   |           CompteLib|ReportDebit|ReportCredit|SoldeBalance|
+---------+--------------------+-----------+------------+------------+
|   447105|Autres impôts, ta...|    77171.0|         0.0|      992.13|
|   753000|Jetons de présenc...|    6839.0 |         0.0|         0.0|
|   511107|Valeurs à l’encai...|        0.0|     77171.0|         0.0|
    111111|                    |        0.0|         0.0|     35065.0|
+---------+--------------------+-----------+------------+------------+

有人可以用示例 Spark Java 表达式指导我吗? 提前致谢。

【问题讨论】:

欢迎来到 Stack Overflow。您已经发布了一个小时的exactly the same question,该帖子已作为副本关闭。请不要通过删除和重新发布问题来滥用该网站。如果重复没有回答您的询问,edit 问题,并描述您遇到的问题。还要确保遵循How to ask 的说明并提供可重现的示例(minimal reproducible example,Repr. Spark Example) 【参考方案1】:

您可以通过应用完全外连接然后将空值替换为所需值来实现此目的。

import static org.apache.spark.sql.functions.*;

...

ds1.join(ds2, ds1.col("Compte").equalTo(ds2.col("Compte")), "full_outer")
                .select(ds1.col("Compte").alias("Compte1"),
                        ds2.col("Compte").alias("Compte2"),
                        ds1.col("Lib"),
                        ds1.col("ReportDebit"),
                        ds1.col("ReportCredit"),
                        ds2.col("SoldeBalance"))
                .withColumn("Compte", when(col("Compte1").isNull(), col("Compte2")).otherwise(col("Compte1")))
                .drop("Compte1", "Compte2")
                .na().fill(0.0, new String[]  "ReportDebit", "ReportCredit", "SoldeBalance" )
                .na().fill("", new String[]  "Lib" )
                .show();

输出:

+--------------------+-----------+------------+------------+------+
|                 Lib|ReportDebit|ReportCredit|SoldeBalance|Compte|
+--------------------+-----------+------------+------------+------+
|Valeurs à l’encai...|        0.0|     77171.0|         0.0|511107|
|Autres impôts, ta...|    77171.0|         0.0|      992.13|447105|
|                    |        0.0|         0.0|     35065.0|111111|
|Jetons de présenc...|     6839.0|         0.0|         0.0|753000|
+--------------------+-----------+------------+------------+------+

【讨论】:

以上是关于在 Spark Java API 中加入行数据集的主要内容,如果未能解决你的问题,请参考以下文章

如何在JAVA中加入没有重复列的Spark数据框

如何在红移中加入行并将其转置为列

在 Spark 中加入数据集

在 Spark 中加入数据框

在 Spark 中加入倾斜的数据集?

在 Spark Scala 中加入后创建嵌套数据