嵌套的json扁平化火花数据框
Posted
技术标签:
【中文标题】嵌套的json扁平化火花数据框【英文标题】:nested json flattening spark dataframe 【发布时间】:2020-10-07 04:30:50 【问题描述】:我正在尝试从嵌套的 jsonString 创建一个数据帧并拆分为多个数据帧,即外部元素数据将转到一个数据帧,而嵌套的子数据将转到另一个数据帧。可能有多个嵌套元素。我查看了其他帖子,它们都没有为以下场景提供工作示例。下面是一个状态数是动态的示例,我想将国家信息和状态信息存储在 2 个单独的 hdfs 文件夹中。所以父数据框保持如下一行。
val jsonStr=""""country":"US","ISD":"001","states":["state1":"NJ","state2":"NY","state3 ":"PA"]"""
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
+---+-------+--------------+
|ISD|country|states |
+---+-------+--------------+
|001|US |[[NJ, NY, PA]]|
+---+-------+--------------+
countryDf.withColumn("states",explode($"states")).show(false)
val statesDf = countryDf.select(explode(countryDf("states").as("states")))
statesDf.show(false)
+------------+
|col |
+------------+
|[NJ, NY, PA]|
+------------+
Expected out put
2 Dataframes
countryDf
+---+-------+
|ISD|country|
+---+-------+
|001|US |
+---+-------+
statesDf
+------+-------+-------+-------+
country| state1| state2| state3
+------+---------------+-------+
US | NJ NY PA
+------+-------+-------+-------+
我查看了堆栈溢出中有关嵌套 json flattening 的其他问题。没有人有相同的工作解决方案。
【问题讨论】:
将 statesDf 扁平化为列而不是行不是更好吗?总是有固定数量的状态吗? 感谢 Shailesh 没有状态是动态的 【参考方案1】:这里有一些代码可以完成这项工作。您应该考虑性能以及列数是否非常大。我已经收集了所有地图字段并将它们添加到数据框中。
val jsonStr=""""country":"US","ISD":"001","states":["state1":"NJ","state2":"NY","state3":"PA"]"""
import spark.implicits._
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
val statesDf = countryDf.select($"country", explode($"states").as("states"))
val index = statesDf.schema.fieldIndex("states")
val stateSchema = statesDf.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
stateSchema.fields.foreach(field =>
columns.add(lit(field.name))
columns.add(col( "state." + field.name))
)
val s2 = statesDf
.withColumn("statesMap", map(columns.toSeq: _*))
val allMapKeys = s2.select(explode($"statesMap")).select($"key").distinct.collect().map(_.get(0).toString)
val s3 = allMapKeys.foldLeft(s2)((a, b) => a.withColumn(b, a("statesMap")(b)))
.drop("statesMap")
s3.show(false)
【讨论】:
【参考方案2】:当您读取嵌套 JSON 并将其转换为数据集时,嵌套部分将存储为结构类型。因此,您必须考虑在数据框中展平结构类型。
val jsonStr=""""country":"US","ISD":"001","states":["state1":"NJ","state2":"NY","state3":"PA"]"""
val countryDf = spark.read.json(Seq(jsonStr).toDS)
countryDf.show(false)
+---+-------+--------------+
|ISD|country|states |
+---+-------+--------------+
|001|US |[[NJ, NY, PA]]|
+---+-------+--------------+
val countryDfExploded = countryDf.withColumn("states",explode($"states"))
countryDfExploded.show(false)
+---+-------+------------+
|ISD|country|states |
+---+-------+------------+
|001|US |[NJ, NY, PA]|
+---+-------+------------+
val countrySelectDf = countryDfExploded.select($"ISD", $"country")
countrySelectDf.show(false)
+---+-------+
|ISD|country|
+---+-------+
|001|US |
+---+-------+
val statesDf = countryDfExploded.select( $"country",$"states.*")
statesDf.show(false)
+-------+------+------+------+
|country|state1|state2|state3|
+-------+------+------+------+
|US |NJ |NY |PA |
+-------+------+------+------+
【讨论】:
以上是关于嵌套的json扁平化火花数据框的主要内容,如果未能解决你的问题,请参考以下文章
扁平化深度嵌套的 JSON 以获取 Dataframe 的最快和通用方法是啥?
使用 pandas json_normalize 扁平化包含多个嵌套列表的字典列表