如何使用新列对 Spark dataFrame 中的字符串字段进行 JSON 转义

Posted

技术标签:

【中文标题】如何使用新列对 Spark dataFrame 中的字符串字段进行 JSON 转义【英文标题】:How to JSON-escape a String field in Spark dataFrame with new column 【发布时间】:2019-06-14 16:29:28 【问题描述】:

如何通过 DataFrame 编写 JSON 格式的新列。我尝试了几种方法,但它将数据写入 JSON 转义字符串字段。 目前其写作为 "test":"id":1,"name":"name","problem_field": "\"x\":100,\"y\":200"

相反,我希望它是 "test":"id":1,"name":"name","problem_field": "x":100,"y":200

problem_field 是基于从其他字段读取的值创建的新列:

val dataFrame = oldDF.withColumn("problem_field", s)

我尝试了以下方法

    dataFrame.write.json(<<outputPath>>) dataFrame.toJSON.map(value => value.replace("\\", "").replace("\"value\":\"", "").replace("\"", "")).write.json(<<outputPath>>)

也尝试转换为DataSet,但没有成功。任何指针都非常感谢。

这里提到的逻辑我已经试过了:How to let Spark parse a JSON-escaped String field as a JSON Object to infer the proper structure in DataFrames?

【问题讨论】:

【参考方案1】:

首先,您的示例数据在 "y\":200 之后有一个多余的逗号,这将阻止它被解析,因为它不是有效的 JSON。

假设您知道架构,您可以从那里使用from_json 解析该字段。在此示例中,我将分别解析字段以首先获取架构:

scala> val json = spark.read.json(Seq(""""test":"id":1,"name":"name","problem_field": "\"x\":100,\"y\":200"""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> json.printSchema
root
 |-- test: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: string (nullable = true)


scala> val problem_field = spark.read.json(json.select($"test.problem_field").map
case org.apache.spark.sql.Row(x : String) => x
)
problem_field: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]          

scala> problem_field.printSchema
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

如果problem_fields 内容的架构在行之间不一致,此解决方案仍然有效,但可能不是处理事情的最佳方式,因为它会产生一个稀疏数据框,其中每行包含@987654325 中遇到的每个字段@。例如:

scala> val json = spark.read.json(Seq(""""test":"id":1,"name":"name","problem_field": "\"x\":100,\"y\":200"""", """"test":"id":1,"name":"name","problem_field": "\"a\":10,\"b\":20"""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> val problem_field = spark.read.json(json.select($"test.problem_field").mapcase org.apache.spark.sql.Row(x : String) => x)
problem_field: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint ... 2 more fields]

scala> problem_field.printSchema
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

scala> fixed.select($"test.problem_field.*").show
+----+----+----+----+
|   a|   b|   x|   y|
+----+----+----+----+
|null|null| 100| 200|
|  10|  20|null|null|
+----+----+----+----+

在数百、数千或数百万行的过程中,您可以看到这将如何产生问题。

【讨论】:

感谢查理的评论,“y\”:200 后面的多余逗号是错字,可以忽略。来到解决方案,在我的情况下没有。 problem_field 中的字段数不固定。我正在动态创建此字段,每条记录将有不同数量的字段。怎么可能处理?谢谢。 如果是这种情况,就没有好的解决方案 - 如果problem_field 中的 JSON 是任意且不一致的,那么您的数据框也是如此。您最好将其保留为字符串并使用 get_json_object 临时提取各个字段,或者解决导致您达到这一点的数据建模问题。 我已经编辑了我的解决方案,以说明它适用于您所描述的情况,但可能不是处理您的问题的最佳方式。 好的,会检查的。谢谢。

以上是关于如何使用新列对 Spark dataFrame 中的字符串字段进行 JSON 转义的主要内容,如果未能解决你的问题,请参考以下文章

如何向 Spark DataFrame 添加新列(使用 PySpark)?

使用具有常量值的 var 在 Spark DataFrame 中创建一个新列

在新列上过滤 Spark DataFrame

PySpark:Spark Dataframe - 将 ImageSchema 列转换为 nDArray 作为新列

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?