如何使用 Spark scala 从字符串格式的复杂 JSON 创建数据帧

Posted

技术标签:

【中文标题】如何使用 Spark scala 从字符串格式的复杂 JSON 创建数据帧【英文标题】:How can i create a dataframe from a complex JSON in string format using Spark scala 【发布时间】:2021-11-03 12:01:09 【问题描述】:

我想使用 Spark scala 从字符串格式的复杂 JSON 创建数据帧。

Spark 版本是 3.1.2。 Scala 版本是 2.12.14。

源数据如下:


  "info": [
    
      "done": "time",
      "id": 9,
      "type": "normal",
      "pid": 202020,
      "add": 
        "fields": true,
        "stat": "not sure"
      
    ,
    
      "done": "time",
      "id": 14,
      "type": "normal",
      "pid": 764310,
      "add": 
        "fields": true,
        "stat": "sure"
      
    ,
    
      "done": "time",
      "id": 9,
      "type": "normal",
      "pid": 202020,
      "add": 
        "note": 
          "id": 922,
          "score": 0
        
      
    
  ],
  "more": 
    "a": "ok",
    "b": "fine",
    "c": 3
  

我尝试了以下操作,但没有工作。

val schema = new StructType().add("info", ArrayType(StringType)).add("more", StringType)

val rdd = ss.sparkContext.parallelize(Seq(Row(data))) // data is as mentioned above JSON

val df = ss.createDataFrame(rdd, schema)

df.printSchema()

模式打印如下

root
 |-- info: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- more: string (nullable = true)

    print(df.head())

Above line throws exception java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<string>

请帮我做这件事。

【问题讨论】:

嗨,欢迎来到堆栈溢出,请阅读this。问题中缺少的主要内容是您到目前为止尝试了什么?因为通过谷歌搜索可以找到关于这个场景的数百万个教程。 【参考方案1】:

如果数据位于 HDFS/S3 等文件中,您可以使用 spark.read.json 函数轻松准备它们

这样的东西应该适用于 hdfs

val df = spark.read.option("multiline","true").json("hdfs:///home/vikas/sample/*.json")

在 s3 上会是

val df = spark.read.option("multiline","true").json("s3a://vikas/sample/*.json")

请确保您对读取文件的路径具有读取权限

正如您在评论中提到的,您正在从 API 读取数据,在这种情况下,以下内容应该适用于 spark 2.2 及更高版本

import spark.implicits._
val jsonStr = """ "metadata":  "key": 84896, "value": 54 """
val df = spark.read.json(Seq(jsonStr).toDS)

【讨论】:

感谢您的回复。这就是限制,我们有从 API 接收的字符串形式的数据。在文件中存储和读取不会有效地使用 Spark 在内存处理中。 @Arjun,如果这对你有用,请告诉我!! 我已经尝试过了,但是从 Seq(data) 它没有显示使用 toDS 方法转换为数据集的方法。所以看来我们需要使用 createDataframe 方法直接创建数据框。只需检查我的火花版本 @Arjun,您需要执行import spark.implicits._ 将序列转换为数据框或数据集 感谢您对深度的指导。您的方式有效,但在获取数据时会引起问题。无论如何,我找到了一种处理字符串数据本身的成功方法。我也会在这里写下来。再次感谢您。【参考方案2】:

我通过这样做找到了解决方案,为我工作:

val schema = new StructType().add("data", StringType)

val rdd = ss.sparkContext.parallelize(Seq(Row(data)))

val df = ss.createDataFrame(rdd, schema)

df.printSchema()

println(df.head().getAs("data").toString)

【讨论】:

请添加更多详细信息以扩展您的答案,例如工作代码或文档引用。 已经提到的代码足以成功运行解决方案

以上是关于如何使用 Spark scala 从字符串格式的复杂 JSON 创建数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala 从 Spark 中的列表或数组创建行

Scala Spark 中的 udf 运行时错误

在 spark DataFrame-Scala 中格式化 TimestampType

如何使用反射从scala调用spark UDF?

Scala中的复和类型

可以将 mlflow.spark 保存的模型加载为 Spark/Scala 管道吗?