将带有架构的火花数据框转换为json字符串的数据框

Posted

技术标签:

【中文标题】将带有架构的火花数据框转换为json字符串的数据框【英文标题】:Convert spark Dataframe with schema to dataframe of json String 【发布时间】:2018-01-14 18:05:56 【问题描述】:

我有一个这样的数据框:

+--+--------+--------+----+-------------+------------------------------+
|id|name    |lastname|age |timestamp    |creditcards                   |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel  |blanc   |35  |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter   |barns   |25  |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+

我的 df 的架构如下:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- creditcards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- number: string (nullable = true)

我想将每一行转换为一个知道我的架构的 json 字符串。所以这个数据框将有一个包含 json 的列字符串。 第一行应该是这样的:


    "id":"1",
    "name":"michel",
    "lastname":"blanc",
    "age":"35",
    "timestamp":"1496756626921",
    "creditcards":[
        
            "id":"hr6",
            "number":"3569823"
        ,
        
            "id":"ee3",
            "number":"1547869"  
        
    ]

数据框的第二行应该是这样的:


    "id":"2",
    "name":"peter",
    "lastname":"barns",
    "age":"25",
    "timestamp":"1496756626551",
    "creditcards":[
        
            "id":"ye8",
            "number":"4569872"
        ,
        
            "id":"qe5",
            "number":"3485762"  
        
    ]

我的目标不是将数据帧写入 json 文件。我的目标是将 df1 转换为第二个 df2 以便将 df2 的每个 json 行推送到 kafka 主题 我有这个代码来创建数据框:

    val line1 = """"id":"1","name":"michel","lastname":"blanc","age":"35","timestamp":"1496756626921","creditcards":["id":"hr6","number":"3569823","id":"ee3","number":"1547869"]"""
    val line2 = """"id":"2","name":"peter","lastname":"barns","age":"25","timestamp":"1496756626551","creditcards":["id":"ye8","number":"4569872", "id":"qe5","number":"3485762"]"""

    val rdd = sc.parallelize(Seq(line1, line2))
    val df = sqlContext.read.json(rdd)
    df show false
    df printSchema

你有什么想法吗?

【问题讨论】:

【参考方案1】:

如果您只需要一个单列 DataFrame/Dataset,其中每列值代表 JSON 中原始 DataFrame 的每一行,您只需将 toJSON 应用于您的 DataFrame,如下所示:

df.show
// +---+------------------------------+---+--------+------+-------------+
// |age|creditcards                   |id |lastname|name  |timestamp    |
// +---+------------------------------+---+--------+------+-------------+
// |35 |[[hr6,3569823], [ee3,1547869]]|1  |blanc   |michel|1496756626921|
// |25 |[[ye8,4569872], [qe5,3485762]]|2  |barns   |peter |1496756626551|
// +---+------------------------------+---+--------+------+-------------+

val dsJson = df.toJSON
// dsJson: org.apache.spark.sql.Dataset[String] = [value: string]

dsJson.show
// +--------------------------------------------------------------------------+
// |value                                                                     |
// +--------------------------------------------------------------------------+
// |"age":"35","creditcards":["id":"hr6","number":"3569823","id":"ee3",...|
// |"age":"25","creditcards":["id":"ye8","number":"4569872","id":"qe5",...|
// +--------------------------------------------------------------------------+

[更新]

要将name 添加为附加列,您可以使用from_json 从JSON 列中提取它:

val result = dsJson.withColumn("name", from_json($"value", df.schema)("name"))

result.show
// +--------------------+------+
// |               value|  name|
// +--------------------+------+
// |"age":"35","cred...|michel|
// |"age":"25","cred...| peter|
// +--------------------+------+

【讨论】:

【参考方案2】:

为此,您可以使用

直接将您的数据框转换为 JSON 字符串的数据集
val jsonDataset: Dataset[String] = df.toJSON

您可以使用

将其转换为数据框
val jsonDF: DataFrame = jsonDataset.toDF

这里的json将按字母顺序排列,所以

的输出
jsonDF show false

将会

    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                                                                               |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |"age":"35","creditcards":["id":"hr6","number":"3569823","id":"ee3","number":"1547869"],"id":"1","lastname":"blanc","name":"michel","timestamp":"1496756626921"|
    |"age":"25","creditcards":["id":"ye8","number":"4569872","id":"qe5","number":"3485762"],"id":"2","lastname":"barns","name":"peter","timestamp":"1496756626551" |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+

【讨论】:

以上是关于将带有架构的火花数据框转换为json字符串的数据框的主要内容,如果未能解决你的问题,请参考以下文章

将嵌套的空值转换为数据框火花内的空字符串

如何将 JSON 格式的数据展平为 spark 数据框

将火花管道转换为数据框

将带有 json 字符串的数据框列转换为不同的列

将多个火花数据框列转换为具有列表类型的单列

如何在不转换为火花数据集的情况下遍历数据框?