如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame

Posted

技术标签:

【中文标题】如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame【英文标题】:How to use a JSON mapping file to generate a new DataFrame in Spark using Scala 【发布时间】:2018-07-24 08:52:11 【问题描述】:

我有两个 DataFrames、DF1DF2,以及一个 JSON 文件,我需要将其用作映射文件来创建另一个数据帧 (DF3)。

DF1:

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|    100|   John| Mumbai|
|    101|   Alex|  Delhi|
|    104|  Divas|Kolkata|
|    108|  Jerry|Chennai|
+-------+-------+-------+

DF2:

+-------+-----------+-------+
|column4|    column5|column6|
+-------+-----------+-------+
|     S1|        New|    xxx|
|     S2|        Old|    yyy|
|     S5|replacement|    zzz|
|    S10|        New|    ppp|
+-------+-----------+-------+

除了这个我拥有的 JSON 格式的映射文件之外,它将用于生成 DF3。

以下是 JSON 映射文件:

"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"
"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"
"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"

因此,我需要从这个 JSON 文件中创建 DF3,并在映射的 targetColumn 部分中提供一列,如果它存在于 DF1 中,它将检查源列,然后它从 @ 映射到 sourceField1 987654331@ 否则 sourceField2 来自 DF2

下面是预期的输出。

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Divas|replacement|       zzz|
|     Jerry|        New|       ppp|
+----------+-----------+----------+

这里的任何帮助都将被占用。

【问题讨论】:

你尝试过 Divas 什么? org.apache.spark.sql -> from_json + schema,全部内置 @Pavel 你能举一个例子我如何在我的代码中实现这一点吗?因为我是 spark 的新手 ds.withColumn("my_json_column", from_json( col("my_json_column"), validJsonSchema) @Pavel 这不起作用。你能在这里解释一下 my_json_column 是什么吗 【参考方案1】:

解析JSON 并创建下面的List 自定义对象

case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))

将虚拟index column 添加到dataframes,并基于index column 加入dataframes

import org.apache.spark.sql.functions._

val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))

创建query 并执行它。

val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show

样本输出:

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Jerry|        New|       ppp|
|     Divas|replacement|       zzz|
+----------+-----------+----------+

希望这种方法对您有所帮助

【讨论】:

感谢您提供这个很棒的解决方案,但我的要求是使用 Scala 而不是 SparkSQl。那么你能帮我在这里只使用 Scala 做同样的事情吗? 嗨@DivasNikhra,我只修改了数据框的解决方案 非常感谢您提供此解决方案。这正是我正在寻找的 :) 你能帮我解决这个问题吗? ***.com/questions/51928103/…

以上是关于如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spark SQL 在 Parquet 文件中选择嵌套数组和映射

如何使用 Spark 加载 JSON(保存在 csv 中的路径)?

如何在 Scala 中使用 Spark SQL 返回多个 JSON 对象

Spark RDD - 使用额外参数进行映射

如何基于相等性检查在 Spark 中使用内部数组查询嵌套 json

使用 Pyspark 使用 Spark 读取巨大 Json 文件的第一行