如何重命名现有列在数组中添加新列?

Posted

技术标签:

【中文标题】如何重命名现有列在数组中添加新列?【英文标题】:How to rename existing columns add new columns in an array? 【发布时间】:2020-06-05 09:48:18 【问题描述】:

env:sapk-2.4.5

source.json


    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        
            "name": "David",
            "age": "25",
            "hobby": "code"
        ,
        
            "name": "John",
            "age": "27",
            "hobby": "tennis"
        ,
        
            "name": "Anata",
            "age": "23",
            "hobby": "dance"
        
    ]

我想要的是在每个元素中添加一个新列 'ID'(由 md5 生成,带有 'name' JSON 正文),并在 'fellows' 数组中重命名其他元素的列名,如:

输出.json


    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        
            "ID":"6F94AF80FC86BD2DBFAFA9C90BF33522",
            "NAME": "David",
            "AGE": "25",
            "HOBBY": "code"
        ,
        
            "ID":"CF848467689DD81CAC9E644F8294B641",
            "NAME": "John",
            "AGE": "27",
            "HOBBY": "tennis"
        ,
        
            "ID":"4F11EBFF1667DDD817921279EEBD5451",
            "NAME": "Anata",
            "AGE": "23",
            "HOBBY": "dance"
        
    ]

我的解决方案:

1

我已经尝试使用 'explode' 和 'collect_set' 函数来解决它:

  val source = spark.read.option("multiLine", "true").json("/mypath/source.json")
  val explode_values = source.select($"group",$"name",$"rank",explode($"fellows").as("explode_fellows"))
  val renamedDF =  explode_values.select($"group",$"name",$"rank", struct(md5(to_json(struct($"explode_fellows.name".as("NAME")))).as("ID"),  $"explode_fellows.name".as("NAME"), $"explode_fellows.age".as("AGE"), $"explode_fellows.HOBBY".as("HOBBY")).as("fellows"))

val result = renamedDF.select($"group", $"name", $"rank", $"fellows").groupBy($"group").agg(first($"name").as("name"),first($"rank").as("rank"), collect_set($"fellows").as("fellows"))

那么结果的架构是:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

2

使用 'array_zip' 只能重命名列:

val result2 = source.select($"group", $"name", $"rank", arrays_zip($"fellows.name", $"fellows.age", $"fellows.hobby").cast("array<struct<NAME: string, AGE:string, HOBBY:string>>").as("fellows"))

那么结果的架构是:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

注意:

“爆炸”和“收集”解决方案不符合我的要求,因为它太复杂了,

或者如果您可以在我的解决方案2 中添加 md5 ID 生成功能会有很大帮助。

如果您能给我一些建议,不胜感激。

【问题讨论】:

【参考方案1】:

IIUC,另一种选择应该如下-

1。加载数据

   val data =
      """
        |
        |    "group": "1",
        |    "name": "badboi",
        |    "rank": "3",
        |    "fellows": [
        |        
        |            "name": "David",
        |            "age": "25",
        |            "hobby": "code"
        |        ,
        |        
        |            "name": "John",
        |            "age": "27",
        |            "hobby": "tennis"
        |        ,
        |        
        |            "name": "Anata",
        |            "age": "23",
        |            "hobby": "dance"
        |        
        |    ]
        |
      """.stripMargin

    val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +-----------------------------------------------------------+-----+------+----+
      * |fellows                                                    |group|name  |rank|
      * +-----------------------------------------------------------+-----+------+----+
      * |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1    |badboi|3   |
      * +-----------------------------------------------------------+-----+------+----+
      *
      * root
      * |-- fellows: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- age: string (nullable = true)
      * |    |    |-- hobby: string (nullable = true)
      * |    |    |-- name: string (nullable = true)
      * |-- group: string (nullable = true)
      * |-- name: string (nullable = true)
      * |-- rank: string (nullable = true)
      */

2。转换添加 md5(name) 作为 ID 并更改 structField 的大小写

 val processedDF = df.withColumn("fellows",
      expr("TRANSFORM(fellows, x -> named_struct('ID', md5(to_json(named_struct('ID', x.name))), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      * |fellows                                                                                                                                                          |group|name  |rank|
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      * |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1    |badboi|3   |
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
      *
      * root
      * |-- fellows: array (nullable = true)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- ID: string (nullable = true)
      * |    |    |-- NAME: string (nullable = true)
      * |    |    |-- AGE: string (nullable = true)
      * |    |    |-- HOBBY: string (nullable = true)
      * |-- group: string (nullable = true)
      * |-- name: string (nullable = true)
      * |-- rank: string (nullable = true)
      */
    processedDF.toJSON.show(false)

//    
//      "fellows": [
//      "ID": "464e07afc9e46359fb480839150595c5",
//      "NAME": "David",
//      "AGE": "25",
//      "HOBBY": "code"
//    , 
//      "ID": "61409aa1fd47d4a5332de23cbf59a36f",
//      "NAME": "John",
//      "AGE": "27",
//      "HOBBY": "tennis"
//    , 
//      "ID": "540356fa1779480b07d0743763c78159",
//      "NAME": "Anata",
//      "AGE": "23",
//      "HOBBY": "dance"
//    ],
//      "group": "1",
//      "name": "badboi",
//      "rank": "3"
//    

【讨论】:

感谢您的回答。请注意,md5 源是一个 json 字符串,例如:"name": "David,因此您的 'ID' 与我的不同。 是的,你能做出改变吗?另外,如果有帮助,请接受+upvote 我已经进行了更改。再次感谢您的帮助。

以上是关于如何重命名现有列在数组中添加新列?的主要内容,如果未能解决你的问题,请参考以下文章

如何删除一列并创建一个新列而不是在 EF Core 中重命名?

重命名项目路径后如何将现有项目添加到 Visual Studio 2012

Oracle查询语句中重命名的列在WHERE中无效?

如何重命名 SVN 分支并更新现有沙箱中的引用?

如何重命名文件并覆盖 VBS 中的现有文件?

如何使用 javascript /Angular 重命名现有对象键? [复制]