如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame
Posted
技术标签:
【中文标题】如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame【英文标题】:How to use a JSON mapping file to generate a new DataFrame in Spark using Scala 【发布时间】:2018-07-24 08:52:11 【问题描述】:我有两个 DataFrame
s、DF1
和 DF2
,以及一个 JSON 文件,我需要将其用作映射文件来创建另一个数据帧 (DF3
)。
DF1:
+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
| 100| John| Mumbai|
| 101| Alex| Delhi|
| 104| Divas|Kolkata|
| 108| Jerry|Chennai|
+-------+-------+-------+
DF2:
+-------+-----------+-------+
|column4| column5|column6|
+-------+-----------+-------+
| S1| New| xxx|
| S2| Old| yyy|
| S5|replacement| zzz|
| S10| New| ppp|
+-------+-----------+-------+
除了这个我拥有的 JSON 格式的映射文件之外,它将用于生成 DF3。
以下是 JSON 映射文件:
"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"
"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"
"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"
因此,我需要从这个 JSON 文件中创建 DF3
,并在映射的 targetColumn
部分中提供一列,如果它存在于 DF1 中,它将检查源列,然后它从 @ 映射到 sourceField1
987654331@ 否则 sourceField2
来自 DF2
。
下面是预期的输出。
+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
| John| New| xxx|
| Alex| Old| yyy|
| Divas|replacement| zzz|
| Jerry| New| ppp|
+----------+-----------+----------+
这里的任何帮助都将被占用。
【问题讨论】:
你尝试过 Divas 什么? org.apache.spark.sql -> from_json + schema,全部内置 @Pavel 你能举一个例子我如何在我的代码中实现这一点吗?因为我是 spark 的新手 ds.withColumn("my_json_column", from_json( col("my_json_column"), validJsonSchema) @Pavel 这不起作用。你能在这里解释一下 my_json_column 是什么吗 【参考方案1】:解析JSON
并创建下面的List
自定义对象
case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))
将虚拟index column
添加到dataframes
,并基于index column
加入dataframes
import org.apache.spark.sql.functions._
val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))
创建query
并执行它。
val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show
样本输出:
+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
| John| New| xxx|
| Alex| Old| yyy|
| Jerry| New| ppp|
| Divas|replacement| zzz|
+----------+-----------+----------+
希望这种方法对您有所帮助
【讨论】:
感谢您提供这个很棒的解决方案,但我的要求是使用 Scala 而不是 SparkSQl。那么你能帮我在这里只使用 Scala 做同样的事情吗? 嗨@DivasNikhra,我只修改了数据框的解决方案 非常感谢您提供此解决方案。这正是我正在寻找的 :) 你能帮我解决这个问题吗? ***.com/questions/51928103/…以上是关于如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spark SQL 在 Parquet 文件中选择嵌套数组和映射
如何使用 Spark 加载 JSON(保存在 csv 中的路径)?
如何在 Scala 中使用 Spark SQL 返回多个 JSON 对象