展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧

Posted

技术标签:

【中文标题】展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧【英文标题】:Flatten any nested json string and convert to dataframe using spark scala 【发布时间】:2020-04-05 13:23:25 【问题描述】:

我正在尝试创建从任何 json 字符串到数据帧的数据帧。 json 字符串通常很深并且嵌套了一些时间。 json字符串是这样的:

val json_string = """
                   "Total Value": 3,
                   "Topic": "Example",
                   "values": [
                              
                                "value1": "#example1",
                                "points": [
                                           [
                                           "123",
                                           "156"
                                          ]
                                    ],
                                "properties": 
                                 "date": "12-04-19",
                                 "model": "Model example 1"
                                    
                                 ,
                               "value2": "#example2",
                                "points": [
                                           [
                                           "124",
                                           "157"
                                          ]
                                    ],
                                "properties": 
                                 "date": "12-05-19",
                                 "model": "Model example 2"
                                    
                                 
                              ]
                       """

我期望的输出是:

+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
|Total Value| Topic     |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
| 3         | Example   | example1 | 123              | 156              | 12-04-19               |  Model Example 1         |
| 3         | Example   | example2 | 124              | 157              | 12-05-19               |    Model example 2         
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+

我正在做扁平化,但在 json 中选择了一些键来获取模式然后扁平化,但我不想以这种方式扁平化。它应该独立于任何要给出的键并相应地展平,如上面的输出所示。 即使在这种情况下给出了值的键之后,由于点是数组,所以我仍然得到 2 列相同的记录,因此点 [0] 为一列,而点 [1] 为不同的列。我的 Scala 火花代码是:

val key = "values" //Ideally this should not be given in my case.
val jsonFullDFSchemaString = spark.read.json(json_location).select(col(key)).schema.json; // changing values to reportData
val jsonFullDFSchemaStructType = DataType.fromJson(jsonFullDFSchemaString).asInstanceOf[StructType]
val df = spark.read.schema(jsonFullDFSchemaStructType).json(json_location);

现在我正在使用扁平化:

 def flattenDataframe(df: DataFrame): DataFrame = 
    //getting all the fields from schema
    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    //length shows the number of fields inside dataframe
    val length = fields.length
    for (i <- 0 to fields.length - 1) 
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match 
        case arrayType: ArrayType =>
          val fieldName1 = fieldName
          val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
          //val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
          return flattenDataframe(explodedDf)

        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
          val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
          val explodedf = df.select(renamedcols: _*)
          return flattenDataframe(explodedf)
        case _ =>
      
    
    df
  

现在终于从 json 获得扁平化数据帧:

val tableSchemaDF = flattenDataframe(df)
println(tableSchemaDF)

因此,理想情况下,任何 json 文件都应该像我上面显示的那样相应地变平,而不提供任何根键并且不创建 2 行。希望我已经提供了足够的细节。任何帮助将不胜感激。谢谢。

请注意:Json 数据来自 API,因此不确定根键“值”是否存在。这就是为什么我不打算为展平提供密钥。

【问题讨论】:

您是否验证了您的 JSON?我认为它的格式不正确。 感谢 @baithmbarek 纠正我的 json 字符串。 我认为这会帮助@Mahesh 【参考方案1】:

这是一个基于递归的解决方案,最后有点“hacky”,因为你有特殊性:

@tailrec
def recurs(df: DataFrame): DataFrame = 
  if(df.schema.fields.find(_.dataType match 
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  ).isEmpty) df
  else 
    val columns = df.schema.fields.map(f => f.dataType match 
      case _: ArrayType => explode(col(f.name)).as(f.name)
      case s: StructType => col(s"$f.name.*")
      case _ => col(f.name)
    )
    recurs(df.select(columns:_*))
  


val recursedDF = recurs(df)
val valuesColumns = recursedDF.columns.filter(_.startsWith("value"))
val projectionDF = recursedDF.withColumn("values", coalesce(valuesColumns.map(col):_*))
  .withColumn("point[0]", $"points".getItem(0))
  .withColumn("point[1]", $"points".getItem(1))
    .drop(valuesColumns :+ "points":_*)
projectionDF.show(false)

输出:

+-------+-----------+--------+---------------+---------+--------+--------+
|Topic  |Total Value|date    |model          |values   |point[0]|point[1]|
+-------+-----------+--------+---------------+---------+--------+--------+
|Example|3          |12-04-19|Model example 1|#example1|123     |156     |
|Example|3          |12-05-19|Model example 2|#example2|124     |157     |
+-------+-----------+--------+---------------+---------+--------+--------+

【讨论】:

我认为它仍然有不需要的“值”列名 问题是您想将不同的字段“合并”成一个字段。即:在您的示例中,应使用“value1”和“value2”来填充单个列。我不明白我们如何才能更通用。 嗨@baitmbarek,非常感谢你的代码真的很有帮助。如果我必须将列名指定为 values.value1、values.points 而不是 values 和 points...我必须进行哪些更改。 嗨@MohammadRijwan,您能否编辑您的问题并添加一个额外的段落来指定您期望的输出?很乐意提供帮助,但我不确定结构:) 嗨@baitmbarek,请参阅我在单独的问题中问过这个问题:***.com/questions/59873760/…【参考方案2】:

我宁愿建议使用 spark in-built 函数。您可以利用spark 函数的explode 来实现此目的。

这里是代码 sn-p。

scala> val df = spark.read.json(Seq(json_string).toDS)
scala> var dfd = df.select($"topic",$"total value",explode($"values").as("values"))

我在这里根据您的需要选择列。如果数据框中没有列,请根据您的要求添加。

scala> dfd.select($"topic",$"total value",$"values.points".getItem(0)(0).as("point_0"),$"values.points".getItem(0)(1).as("point_1"),$"values.properties.date".as("_date"),$"values.properties.model".as("_model")).show
+-------+-----------+-------+-------+--------+---------------+
|  topic|total value|point_0|point_1|   _date|         _model|
+-------+-----------+-------+-------+--------+---------------+
|Example|          3|    123|    156|12-04-19|Model example 1|
|Example|          3|    124|    157|12-05-19|Model example 2|
+-------+-----------+-------+-------+--------+---------------+

如果 JSON 中的列数有限,此方法将为您提供最佳结果。

【讨论】:

实际上 json 来自 API,因此不确定键“值”是否相同 只是一个示例。

以上是关于展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧的主要内容,如果未能解决你的问题,请参考以下文章

使用具有相同名称的嵌套子属性展平 Spark JSON 数据框

在 Spark DataFrame 中展平嵌套数组

如何将 JSON 格式的数据展平为 spark 数据框

使用 JQ 展平嵌套的 Json 对象

如何展平多个嵌套的 json 并转换为数据框?

生成嵌套 JSON(反向横向展平)