如何重命名现有列在数组中添加新列?
Posted
技术标签:
【中文标题】如何重命名现有列在数组中添加新列?【英文标题】:How to rename existing columns add new columns in an array? 【发布时间】:2020-06-05 09:48:18 【问题描述】:env:sapk-2.4.5
source.json
"group": "1",
"name": "badboi",
"rank": "3",
"fellows": [
"name": "David",
"age": "25",
"hobby": "code"
,
"name": "John",
"age": "27",
"hobby": "tennis"
,
"name": "Anata",
"age": "23",
"hobby": "dance"
]
我想要的是在每个元素中添加一个新列 'ID'(由 md5 生成,带有 'name' JSON 正文),并在 'fellows' 数组中重命名其他元素的列名,如:
输出.json
"group": "1",
"name": "badboi",
"rank": "3",
"fellows": [
"ID":"6F94AF80FC86BD2DBFAFA9C90BF33522",
"NAME": "David",
"AGE": "25",
"HOBBY": "code"
,
"ID":"CF848467689DD81CAC9E644F8294B641",
"NAME": "John",
"AGE": "27",
"HOBBY": "tennis"
,
"ID":"4F11EBFF1667DDD817921279EEBD5451",
"NAME": "Anata",
"AGE": "23",
"HOBBY": "dance"
]
我的解决方案:
1
我已经尝试使用 'explode' 和 'collect_set' 函数来解决它:
val source = spark.read.option("multiLine", "true").json("/mypath/source.json")
val explode_values = source.select($"group",$"name",$"rank",explode($"fellows").as("explode_fellows"))
val renamedDF = explode_values.select($"group",$"name",$"rank", struct(md5(to_json(struct($"explode_fellows.name".as("NAME")))).as("ID"), $"explode_fellows.name".as("NAME"), $"explode_fellows.age".as("AGE"), $"explode_fellows.HOBBY".as("HOBBY")).as("fellows"))
val result = renamedDF.select($"group", $"name", $"rank", $"fellows").groupBy($"group").agg(first($"name").as("name"),first($"rank").as("rank"), collect_set($"fellows").as("fellows"))
那么结果的架构是:
root
|-- group: string (nullable = true)
|-- name: string (nullable = true)
|-- rank: string (nullable = true)
|-- fellows: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ID: string (nullable = true)
| | |-- NAME: string (nullable = true)
| | |-- AGE: string (nullable = true)
| | |-- HOBBY: string (nullable = true)
2
使用 'array_zip' 只能重命名列:
val result2 = source.select($"group", $"name", $"rank", arrays_zip($"fellows.name", $"fellows.age", $"fellows.hobby").cast("array<struct<NAME: string, AGE:string, HOBBY:string>>").as("fellows"))
那么结果的架构是:
root
|-- group: string (nullable = true)
|-- name: string (nullable = true)
|-- rank: string (nullable = true)
|-- fellows: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- AGE: string (nullable = true)
| | |-- HOBBY: string (nullable = true)
注意:
“爆炸”和“收集”解决方案不符合我的要求,因为它太复杂了,
或者如果您可以在我的解决方案2 中添加 md5 ID 生成功能会有很大帮助。
如果您能给我一些建议,不胜感激。
【问题讨论】:
【参考方案1】:IIUC,另一种选择应该如下-
1。加载数据
val data =
"""
|
| "group": "1",
| "name": "badboi",
| "rank": "3",
| "fellows": [
|
| "name": "David",
| "age": "25",
| "hobby": "code"
| ,
|
| "name": "John",
| "age": "27",
| "hobby": "tennis"
| ,
|
| "name": "Anata",
| "age": "23",
| "hobby": "dance"
|
| ]
|
""".stripMargin
val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +-----------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------+-----+------+----+
* |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1 |badboi|3 |
* +-----------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- age: string (nullable = true)
* | | |-- hobby: string (nullable = true)
* | | |-- name: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
2。转换添加 md5(name) 作为 ID 并更改 structField 的大小写
val processedDF = df.withColumn("fellows",
expr("TRANSFORM(fellows, x -> named_struct('ID', md5(to_json(named_struct('ID', x.name))), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
processedDF.show(false)
processedDF.printSchema()
/**
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1 |badboi|3 |
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = false)
* | | |-- ID: string (nullable = true)
* | | |-- NAME: string (nullable = true)
* | | |-- AGE: string (nullable = true)
* | | |-- HOBBY: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
processedDF.toJSON.show(false)
//
// "fellows": [
// "ID": "464e07afc9e46359fb480839150595c5",
// "NAME": "David",
// "AGE": "25",
// "HOBBY": "code"
// ,
// "ID": "61409aa1fd47d4a5332de23cbf59a36f",
// "NAME": "John",
// "AGE": "27",
// "HOBBY": "tennis"
// ,
// "ID": "540356fa1779480b07d0743763c78159",
// "NAME": "Anata",
// "AGE": "23",
// "HOBBY": "dance"
// ],
// "group": "1",
// "name": "badboi",
// "rank": "3"
//
【讨论】:
感谢您的回答。请注意,md5 源是一个 json 字符串,例如:"name": "David,因此您的 'ID' 与我的不同。 是的,你能做出改变吗?另外,如果有帮助,请接受+upvote 我已经进行了更改。再次感谢您的帮助。以上是关于如何重命名现有列在数组中添加新列?的主要内容,如果未能解决你的问题,请参考以下文章
如何删除一列并创建一个新列而不是在 EF Core 中重命名?