在读取/加载时将原始JSON保留为Spark DataFrame中的列?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在读取/加载时将原始JSON保留为Spark DataFrame中的列?相关的知识,希望对你有一定的参考价值。

在将数据读入Spark DataFrame时,我一直在寻找一种将原始(JSON)数据添加为列的方法。我有一种方法可以通过连接执行此操作,但我希望有一种方法可以在使用Spark 2.2.x +的单个操作中执行此操作。

例如数据:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}

执行时:

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show

可以预见,我们得到:

+--------------+----------+--------------------+--------------+
|        colors|eliminated|              origin|          team|
+--------------+----------+--------------------+--------------+
|gold,red,black|      null|           Las Vegas|Golden Knights|
|          null|      true|            San Jose|        Sharks|
|red,green,gold|      null|           Minnesota|          Wild|
|red,white,blue|     false|District of Columbia|      Capitals|
+--------------+----------+--------------------+--------------+

我在初始加载时想要的是上面的内容,但是将原始JSON数据作为附加列。例如(截断的原始值):

+--------------+-------------------------------+--------------+--------------------+
|        colors|eliminated|              origin|          team|               value|
+--------------+----------+--------------------+--------------+--------------------+
|red,white,blue|     false|District of Columbia|      Capitals|{"colors":"red,wh...|
|gold,red,black|      null|           Las Vegas|Golden Knights|{"colors":"gold,r...|
|          null|      true|            San Jose|        Sharks|{"eliminated":"tr...|
|red,green,gold|      null|           Minnesota|          Wild|{"colors":"red,gr...|
+--------------+----------+--------------------+--------------+--------------------+

一个非理想的解决方案涉及一个联接:

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")

这导致与上面相同的数据帧,但添加和添加uniqueID列。另外,json是从DF渲染的,不一定是“原始”数据。在实践中它们是相同的,但对于我的用例,实际的原始数据是更可取的。

有人知道一个解决方案会将原始JSON数据捕获为加载时的附加列吗?

答案

如果你有一个你收到的数据的模式,那么你可以使用from_jsonschema获取所有字段并保持raw字段,因为它是

val logs = spark.sparkContext.textFile(path) // example data file

val schema = StructType(
  StructField("team", StringType, true)::
  StructField("colors", StringType, true)::
  StructField("eliminated", StringType, true)::
  StructField("origin", StringType, true)::Nil
)

logs.toDF("values")
    .withColumn("json", from_json($"values", schema))
    .select("values", "json.*")

    .show(false)

输出:

+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|values                                                                  |team          |colors        |eliminated|origin   |
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null      |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"}             |Sharks        |null          |true      |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}          |Wild          |red,green,gold|null      |Minnesota|
+------------------------------------------------------------------------+--------------+--------------+----------+---------+

希望他的帮助!

另一答案

您可以简单地将to_json内置函数与.withColumn函数结合使用

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs)
import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)

甚至更好,不要使用sparkContexttextFile读作rdd,只需使用sparkSession读取json文件为

val df = spark.read.json("/Users/vgk/data/tiny.json")

import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)

你应该得到

+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|colors        |eliminated|origin   |team          |value                                                                   |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|gold,red,black|null      |Las Vegas|Golden Knights|{"colors":"gold,red,black","origin":"Las Vegas","team":"Golden Knights"}|
|null          |true      |San Jose |Sharks        |{"eliminated":"true","origin":"San Jose","team":"Sharks"}               |
|red,green,gold|null      |Minnesota|Wild          |{"colors":"red,green,gold","origin":"Minnesota","team":"Wild"}          |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+

以上是关于在读取/加载时将原始JSON保留为Spark DataFrame中的列?的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Spark 中读取漂亮的打印 json 文件

如何在使用 Spark 读取时将数据分配到 X 分区?

Spark使用DataFrame读取复杂JSON中的嵌套数组

如何使用 spark 从 hbase 读取

在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象

使用 spark 读取和访问 json 文件中的嵌套字段