如何使用 Spark-Scala 解析 JSON 数据

Posted

技术标签:

【中文标题】如何使用 Spark-Scala 解析 JSON 数据【英文标题】:How to parse the JSON data using Spark-Scala 【发布时间】:2019-07-25 03:41:48 【问题描述】:

我需要解析 JSON 数据,如下面的预期结果所示,目前我不知道如何在 Signal 列中包含信号名称(ABS、ADA、ADW)。任何帮助将非常感激。

我尝试了一些结果,如下所示,但我还需要在 SIGNAL 列中包含所有信号,这会在预期结果中显示。

jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()

+-------------+--------- --+
|        stime|can_value   |
+-------------+---------   +
|value of E   |value of V  |
+-------------+----------- +

df.printSchema

 -- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |-- APP: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

I will need output like below:

-----------------+-------------+---------+
|SIGNAL        |stime            |can_value|
+-----------------+-------------+---------+
|ABS           |value of E   |value of V  |
|ADA           |value of E   |value of V  |
|ADW           |value of E   |value of V  |
+-----------------+-------------+---------+

【问题讨论】:

【参考方案1】:

要获得预期的输出,并在 Signal 列中插入值:

jsonDF.select(explode($"ABS") as "element")
    .withColumn("stime", col("element.E"))
    .withColumn("can_value", col("element.V"))
    .drop(col("element"))
    .withColumn("SIGNAL",lit("ABS"))
    .show()

以及上述方法的通用版本:

(基于 df.printSchema 的结果假设,您将信号值作为列名,并且这些列包含具有 struct(E,V) 形式的元素的数组)

val columns:Array[String] = df.columns

var arrayOfDFs:Array[DataFrame] = Array()

for(col_name <- columns)

  val temp = df.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("SIGNAL"),
      col("element.E").as("stime"),
      col("element.V").as("can_value"))

  arrayOfDFs = arrayOfDFs :+ temp


val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)

【讨论】:

您好@Arati,感谢您的帮助,我到了那里,但目前遇到一个问题,根据 df.printSchema,最终的 jsonDF 重复打印仅最后一个信号名称(APP)和信号 (APP) 的最后 E、V 值。有没有办法确保 jsonDF 在退出 for 循环之前保存所有信号名称及其对应的值。任何帮助将不胜感激。 @Anil,我遇到了问题并更新了答案,请试一试。由于 '.withColumn("SIGNAL",when(col("SIGNAL").isNotNull, col("SIGNAL")).otherwise(lit(col_name)))' 在上一个答案中的 jsonDF 上,它正在复制最后一个信号所有行的名称。 感谢您的宝贵时间,我已接受您的回答 需要以下请求的帮助:我的 df.printSchema 看起来像根目录下 |-- APP: array (nullable = true) | |-- 元素:结构 (containsNull = true) | | |-- E: long (可为空=真) | | |-- V: double (nullable = true) |-- B1X: array (nullable = true) | |-- 元素:结构 (containsNull = true) | | |-- E: long (可为空=真) | | |-- V: long (nullable = true) |-- VIN: string (nullable = true) 在我的最终 jsonDF 我还需要包含 VIN (SIGNAL, STIME, CAN_VALUE, VIN) 要在最终的 jsonDF 中添加更多列,您必须在 for 循环中创建 'temp' 数据框时选择它们,如下所示:'df.selectExpr("explode("+col_name+") as element, VIN").select(.,., col("VIN"))' 你好@Arati,因为 VIN 不是地图/数组类型,我不能在分解函数中使用它。在 for(col_name VIN)'由于数据类型不匹配:函数explode的输入应该是数组或映射类型,而不是字符串;但我将在我的最终 jsonDF 上需要这个 VIN 作为(SIGNAL,stime,can_value,VIN)。任何帮助将不胜感激。

以上是关于如何使用 Spark-Scala 解析 JSON 数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark-scala 在 spark 数据帧上执行枢轴?

我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?

Spark-Scala:另存为 csv 文件(RDD)[重复]

Spark - Scala:当 json 数据分布在多行时,读取 json 文件作为数据帧不起作用?

Spark-Scala 无法推断架构(将输入路径验证推迟到数据源中)

以动态方式找到Spark-Scala中的百分位数