在读取/加载时将原始JSON保留为Spark DataFrame中的列?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在读取/加载时将原始JSON保留为Spark DataFrame中的列?相关的知识,希望对你有一定的参考价值。
在将数据读入Spark DataFrame时,我一直在寻找一种将原始(JSON)数据添加为列的方法。我有一种方法可以通过连接执行此操作,但我希望有一种方法可以在使用Spark 2.2.x +的单个操作中执行此操作。
例如数据:
{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}
执行时:
val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show
可以预见,我们得到:
+--------------+----------+--------------------+--------------+
| colors|eliminated| origin| team|
+--------------+----------+--------------------+--------------+
|gold,red,black| null| Las Vegas|Golden Knights|
| null| true| San Jose| Sharks|
|red,green,gold| null| Minnesota| Wild|
|red,white,blue| false|District of Columbia| Capitals|
+--------------+----------+--------------------+--------------+
我在初始加载时想要的是上面的内容,但是将原始JSON数据作为附加列。例如(截断的原始值):
+--------------+-------------------------------+--------------+--------------------+
| colors|eliminated| origin| team| value|
+--------------+----------+--------------------+--------------+--------------------+
|red,white,blue| false|District of Columbia| Capitals|{"colors":"red,wh...|
|gold,red,black| null| Las Vegas|Golden Knights|{"colors":"gold,r...|
| null| true| San Jose| Sharks|{"eliminated":"tr...|
|red,green,gold| null| Minnesota| Wild|{"colors":"red,gr...|
+--------------+----------+--------------------+--------------+--------------------+
一个非理想的解决方案涉及一个联接:
val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")
这导致与上面相同的数据帧,但添加和添加uniqueID
列。另外,json是从DF渲染的,不一定是“原始”数据。在实践中它们是相同的,但对于我的用例,实际的原始数据是更可取的。
有人知道一个解决方案会将原始JSON数据捕获为加载时的附加列吗?
答案
如果你有一个你收到的数据的模式,那么你可以使用from_json
和schema
获取所有字段并保持raw
字段,因为它是
val logs = spark.sparkContext.textFile(path) // example data file
val schema = StructType(
StructField("team", StringType, true)::
StructField("colors", StringType, true)::
StructField("eliminated", StringType, true)::
StructField("origin", StringType, true)::Nil
)
logs.toDF("values")
.withColumn("json", from_json($"values", schema))
.select("values", "json.*")
.show(false)
输出:
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|values |team |colors |eliminated|origin |
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"} |Sharks |null |true |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"} |Wild |red,green,gold|null |Minnesota|
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
希望他的帮助!
另一答案
您可以简单地将to_json
内置函数与.withColumn
函数结合使用
val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs)
import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)
甚至更好,不要使用sparkContext
的textFile
读作rdd
,只需使用sparkSession
读取json文件为
val df = spark.read.json("/Users/vgk/data/tiny.json")
import org.apache.spark.sql.functions._
df.withColumn("value", to_json(struct(df.columns.map(col): _*))).show(false)
你应该得到
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|colors |eliminated|origin |team |value |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
|gold,red,black|null |Las Vegas|Golden Knights|{"colors":"gold,red,black","origin":"Las Vegas","team":"Golden Knights"}|
|null |true |San Jose |Sharks |{"eliminated":"true","origin":"San Jose","team":"Sharks"} |
|red,green,gold|null |Minnesota|Wild |{"colors":"red,green,gold","origin":"Minnesota","team":"Wild"} |
+--------------+----------+---------+--------------+------------------------------------------------------------------------+
以上是关于在读取/加载时将原始JSON保留为Spark DataFrame中的列?的主要内容,如果未能解决你的问题,请参考以下文章
在 Apache Spark 中读取漂亮的打印 json 文件
Spark使用DataFrame读取复杂JSON中的嵌套数组