Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

Posted

技术标签:

【中文标题】Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据【英文标题】:Apache Spark : how to insert data in a column with empty values in dataFrame using Java 【发布时间】:2018-04-20 07:24:52 【问题描述】:

我必须使用 DataFrame2DataFrame1 中可用的值插入到具有空值的列之一中。基本上更新 DataFrame2 中的列。

两个 DataFrame 都有 2 个公共列。

有没有办法使用 Java 做同样的事情?或者可以有不同的方法?

示例输入:

1) 文件 1.csv

BILL_ID,BILL_NBR_TYPE_CD,BILL_NBR,VERSION,PRIM_SW
0501841898,BIN     ,404154,1000,Y
0681220958,BIN     ,735332,1000,Y
5992410180,BIN     ,454680,1000,Y
6995270884,SREBIN  ,1000252750295575,1000,Y

这里BILL_ID 是系统ID,BILL_NBR 是外部ID。

2) 文件2.csv

TXN_ID,TXN_TYPE,BILL_ID,BILL_NBR_TYPE_CD,BILL_NBR
01234, ABC     ,"     ",BIN     ,404154
22365, XYZ     ,"     ",BIN     ,735332
45890, LKJ     ,"     ",BIN     ,454680
23456, MPK     ,"     ",SREBIN  ,1000252750295575

样本输出

如下图BILL_ID值应填入File2.csv

01234, ABC     ,501841898,BIN     ,404154
22365, XYZ     ,681220958,BIN     ,735332
45890, LKJ     ,5992410180,BIN     ,454680
23456, MPK     ,6995270884,SREBIN  ,1000252750295575

我创建了两个 DataFrame 并将两个文件的数据都加载到其中,现在我不知道如何继续。

编辑

基本上我想清楚以下三个步骤:

    如何从 File2.csv 中获取 BILL_NBR 和 BILL_NBR_TYPE_CD 值?

对于这一步我写了:file2Df.select("BILL_NBR_TYPE_CD","BILL_NBR");

    如何根据 step1 中检索到的值从 File1.csv 中获取 BILL_ID 值?

    如何在 File2.csv 中相应地更新 BILL_ID 值?

我是火花新手,如果有人可以指点,我将不胜感激。

【问题讨论】:

这是一个简单的 SQL 连接问题。在df1df2然后从df1df2 span>之间进行选择 复制:***.com/questions/43033835/… @philantrovert 谢谢指出......但是可以基于两列进行内连接吗?我正在检查 API 是否相同。 File2 中的 BILL_ID 列也是空的,它会去哪里? @philantrovert 我已经尝试过数据集 加入 = txnDf.join(accountDf,txnDf.col("BILL_NBR").equalTo(accountDf.col("BILL_NBR")).and(txnDf. col("BILL_NBR_TYPE_CD").equalTo(accountDf.col("BILL_NBR_TYPE_CD"))),"inner");根据您的建议,但收到此错误:线程“主”org.apache.spark.sql.AnalysisException 中的异常:插入分配和书籍/输出文件/Transformed23Apr.csv 时发现重复列:bill_nbr,@987654335 @, bill_nbr_type_cd; 【参考方案1】:

您需要根据BILL_NBR 列连接两个表。

假设:BILL_NBRBILL_ID 列之间存在一对一的关系。

假设您的 File1.csv 和 File2.csv 的数据框名称分别为 file1DFfile2DF,以下应该适合您:

Dataset<Row> file1DF = file1DF.select("BILL_ID","BILL_NBR","BILL_NBR_TYPE_CD");
Dataset<Row> file2DF = file2DF.select("TXN_ID","TXN_TYPE","BILL_NBR_TYPE_CD","BILL_NBR");
Dataset<Row> file2DF = file2DF.join(file1DF, file1DF("BILL_NBR","BILL_NBR_TYPE_CD"));

注意:我没有资源来运行上面的代码来测试它。如果您遇到任何编译时或运行时错误,请告诉我。

【讨论】:

BILL_NBR,BILL_NBR_TYPE_CD 和 BILL_ID 之间存在一对一的关系,所以应该根据这两列进行连接吧?你能更新代码吗? 是的,也添加另一列,它应该可以工作。你试过吗? 更新了代码。不确定语法的正确性。 执行 crossJoin() 并在某些条件下将其存储回来 @vatsalmevada 在你写的最后一行 file1DF("BILL_NBR","BILL_NBR_TYPE_CD") 给出了编译器错误,因为没有定义这样的函数,你打算在那里使用 select 吗?

以上是关于Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Java API 将文本文件、图像存储到 Apache Spark 中?

如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

如何在现有的 Web 应用程序中使用 apache spark

如何在 Apache Spark 中将 Scala UDF 转换为 Java 版本?

如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?