如何使用 Spark-Scala 解析 JSON 数据
Posted
技术标签:
【中文标题】如何使用 Spark-Scala 解析 JSON 数据【英文标题】:How to parse the JSON data using Spark-Scala 【发布时间】:2019-07-25 03:41:48 【问题描述】:我需要解析 JSON 数据,如下面的预期结果所示,目前我不知道如何在 Signal 列中包含信号名称(ABS、ADA、ADW)。任何帮助将非常感激。
我尝试了一些结果,如下所示,但我还需要在 SIGNAL 列中包含所有信号,这会在预期结果中显示。
jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()
+-------------+--------- --+
| stime|can_value |
+-------------+--------- +
|value of E |value of V |
+-------------+----------- +
df.printSchema
-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
|-- APP: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
I will need output like below:
-----------------+-------------+---------+
|SIGNAL |stime |can_value|
+-----------------+-------------+---------+
|ABS |value of E |value of V |
|ADA |value of E |value of V |
|ADW |value of E |value of V |
+-----------------+-------------+---------+
【问题讨论】:
【参考方案1】:要获得预期的输出,并在 Signal 列中插入值:
jsonDF.select(explode($"ABS") as "element")
.withColumn("stime", col("element.E"))
.withColumn("can_value", col("element.V"))
.drop(col("element"))
.withColumn("SIGNAL",lit("ABS"))
.show()
以及上述方法的通用版本:
(基于 df.printSchema 的结果假设,您将信号值作为列名,并且这些列包含具有 struct(E,V) 形式的元素的数组)
val columns:Array[String] = df.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns)
val temp = df.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("SIGNAL"),
col("element.E").as("stime"),
col("element.V").as("can_value"))
arrayOfDFs = arrayOfDFs :+ temp
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
【讨论】:
您好@Arati,感谢您的帮助,我到了那里,但目前遇到一个问题,根据 df.printSchema,最终的 jsonDF 重复打印仅最后一个信号名称(APP)和信号 (APP) 的最后 E、V 值。有没有办法确保 jsonDF 在退出 for 循环之前保存所有信号名称及其对应的值。任何帮助将不胜感激。 @Anil,我遇到了问题并更新了答案,请试一试。由于 '.withColumn("SIGNAL",when(col("SIGNAL").isNotNull, col("SIGNAL")).otherwise(lit(col_name)))' 在上一个答案中的 jsonDF 上,它正在复制最后一个信号所有行的名称。 感谢您的宝贵时间,我已接受您的回答 需要以下请求的帮助:我的 df.printSchema 看起来像根目录下 |-- APP: array (nullable = true) | |-- 元素:结构 (containsNull = true) | | |-- E: long (可为空=真) | | |-- V: double (nullable = true) |-- B1X: array (nullable = true) | |-- 元素:结构 (containsNull = true) | | |-- E: long (可为空=真) | | |-- V: long (nullable = true) |-- VIN: string (nullable = true) 在我的最终 jsonDF 我还需要包含 VIN (SIGNAL, STIME, CAN_VALUE, VIN) 要在最终的 jsonDF 中添加更多列,您必须在 for 循环中创建 'temp' 数据框时选择它们,如下所示:'df.selectExpr("explode("+col_name+") as element, VIN").select(.,., col("VIN"))' 你好@Arati,因为 VIN 不是地图/数组类型,我不能在分解函数中使用它。在 for(col_name VIN)'由于数据类型不匹配:函数explode的输入应该是数组或映射类型,而不是字符串;但我将在我的最终 jsonDF 上需要这个 VIN 作为(SIGNAL,stime,can_value,VIN)。任何帮助将不胜感激。以上是关于如何使用 Spark-Scala 解析 JSON 数据的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark-scala 在 spark 数据帧上执行枢轴?
我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?
Spark-Scala:另存为 csv 文件(RDD)[重复]
Spark - Scala:当 json 数据分布在多行时,读取 json 文件作为数据帧不起作用?