将 spark 的数据框列转换为 json 对象
Posted
技术标签:
【中文标题】将 spark 的数据框列转换为 json 对象【英文标题】:Convert spark's dataframe columns to json object 【发布时间】:2021-01-21 18:02:31 【问题描述】:我有一个包含以下数据的数据框
+-----------+-------|-----|
|file_name | key |Value|
+-----------+-------+-----+
| file1 | key1 | 7 |
| file1 | key2 | 11 |
| file1 | key3 | 3 |
| file2 | key1 | 9 |
| file2 | key2 | 2 |
| file2 | key3 | 10 |
+-----------+-------+-----+
通过以下代码,我已经解决了我的问题的一步
dataset.select(col("file_name"), to_json(struct(col("key").alias("key"),col("value").alias("value"))).alias("output"))
.groupBy(col("file_name")).agg(collect_list(col("output")).alias("output"))
.show(false);
这给了我这样的输出 -
+-----------+-------------------------------------------------------------------------------------|
|file_name | output |
+-----------+-------------------------------------------------------------------------------------|
| file1 |["key":"key1","value":"7", "key":"key2","value":"11", "key":"key3","value":"3"]|
| file2 |["key":"key1","value":"9", "key":"key2","value":"2", "key":"key3","value":"10"]|
+-----------+-------------------------------------------------------------------------------------|
但是我希望我的最终输出采用以下 json 结构。您能否建议我进行任何更改以获取以下格式的输出(json 对象包含 json 数组)。
+-----------+----------------------------------------------------------------------------------------------|
|file_name | output |
+-----------+----------------------------------------------------------------------------------------------|
| file1 |"result":["key":"key1","value":"7","key":"key2","value":"11","key":"key3","value":"3"]|
| file2 |"result":["key":"key1","value":"9","key":"key2","value":"2","key":"key3","value":"10"]|
+-----------+----------------------------------------------------------------------------------------------|
【问题讨论】:
【参考方案1】:您可以在调用to_json
之前将结果放入结构中。请注意,您不应两次调用 to_json
,因为这会导致双引号。
dataset.groupBy("file_name").agg(
to_json(
struct(
collect_list(struct("key", "value")).alias("result")
)
).alias("output")
).show(false)
+---------+----------------------------------------------------------------------------------------------+
|file_name|output |
+---------+----------------------------------------------------------------------------------------------+
|file2 |"result":["key":"key1","value":"9","key":"key2","value":"2","key":"key3","value":"10"]|
|file1 |"result":["key":"key1","value":"7","key":"key2","value":"11","key":"key3","value":"3"]|
+---------+----------------------------------------------------------------------------------------------+
【讨论】:
【参考方案2】:尝试添加另一个select
语句:
select(col("file_name"), to_json(struct(col("output").alias("result"))).alias("output"))
代码应该是这样的:
dataset.select(col("file_name"), to_json(struct(col("key").alias("key"),col("value").alias("value"))).alias("output"))
.groupBy(col("file_name")).agg(collect_list(col("output")).alias("output"))
.select(col("file_name"), to_json(struct(col("output").alias("result"))).alias("output"))
.show(false);
【讨论】:
以上是关于将 spark 的数据框列转换为 json 对象的主要内容,如果未能解决你的问题,请参考以下文章
pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象
Apache Spark - 将 UDF 的结果分配给多个数据框列