Spark 数据框将嵌套的 JSON 转换为单独的列

Posted

技术标签:

【中文标题】Spark 数据框将嵌套的 JSON 转换为单独的列【英文标题】:Spark dataframes convert nested JSON to seperate columns 【发布时间】:2016-07-10 19:52:52 【问题描述】:

我有一个 JSON 流,其结构如下,可以转换为数据帧


  "a": 3936,
  "b": 123,
  "c": "34",
  "attributes": 
    "d": "146",
    "e": "12",
    "f": "23"
  

数据框显示函数结果如下

sqlContext.read.json(jsonRDD).show

+----+-----------+---+---+
|   a| attributes|  b|  c|
+----+-----------+---+---+
|3936|[146,12,23]|123| 34|
+----+-----------+---+---+

如何将属性列(嵌套 JSON 结构)拆分为 attributes.d、attributes.e 和 attributes.f 作为 单独 列到一个新的数据框,所以我可以在新数据框中将列作为 a、b、c、attributes.d、attributes.e 和 attributes.f 吗?

【问题讨论】:

【参考方案1】:

如果你想要从af 命名的列:

df.select("a", "b", "c", "attributes.d", "attributes.e", "attributes.f")

如果你想要以attributes. 前缀命名的列:

df.select($"a", $"b", $"c", $"attributes.d" as "attributes.d", $"attributes.e" as "attributes.e", $"attributes.f" as "attributes.f")

如果您的列名称是从外部来源(例如配置)提供的:

val colNames: Seq("a", "b", "c", "attributes.d", "attributes.e", "attributes.f")

df.select(colNames.head, colNames.tail: _*).toDF(colNames:_*)

【讨论】:

这对我的问题有效。谢谢!我在问题中没有提到的一件事是采用通用方法,其中列名对于数据框中的不同消息可能不同,因此要处理的列名是在外部配置的。换句话说,是否可以避免在 select 函数中对列名进行硬编码。不幸的是, select(tags.head, tags.tail: _*) 不起作用。 好的,我得到了这个: val tags: Seq("a", "b", "c", "attributes.d", "attributes.e", "attributes.f ");df.select(tags.head, tags.tail: *).toDF(tags:*).show()。如果有意义,我可以编辑你的答案?【参考方案2】:

使用 attributes.d 表示法,您可以创建新列,并将它们放在 DataFrame 中。查看 Java 中的 withColumn() 方法。

【讨论】:

【参考方案3】:

使用 Python

    使用python的pandas Lib提取DataFrame。 将数据类型从“str”更改为“dict”。 获取每个特征的值。

    将结果保存到新文件。

    import pandas as pd
    
    data = pd.read_csv("data.csv")  # load the csv file from your disk
    json_data = data['Desc']        # get the DataFrame of Desc
    data = data.drop('Desc', 1)     # delete Desc column
    Total, Defective = [], []       # setout list
    
    for i in json_data:
        i = eval(i)     # change the data type from 'str' to 'dict'
        Total.append(i['Total'])    # append 'Total' feature
        Defective.append(i['Defective'])    # append 'Defective' feature
    
    # finally,complete the DataFrame
    data['Total'] = Total
    data['Defective'] = Defective
    
    data.to_csv("result.csv")       # save to the result.csv and check it
    

【讨论】:

以上是关于Spark 数据框将嵌套的 JSON 转换为单独的列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 将 spark 数据帧转换为嵌套 JSON

展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧

我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json

如何将 JSON 格式的数据展平为 spark 数据框

在scala中将spark决策树模型调试字符串转换为嵌套JSON

将带有嵌套标签的 XML 读入 Spark RDD,并转换为 JSON