想将key添加到pyspark dataFrame的爆炸数组中

Posted

技术标签:

【中文标题】想将key添加到pyspark dataFrame的爆炸数组中【英文标题】:Want to add the key to the exploded array of the pyspark dataFrame 【发布时间】:2020-07-02 11:59:57 【问题描述】:

我有一个 JSON 格式,我将其转换为 Pyspark 数据框。 JSON 对象包含一个我正在爆炸的数组,然后我正在使用 select 查询数据。 在那我想做两件事。 第一的: 检查 iF 数组中是否存在任何特定的键:(我已经使用 Infer Schema 成功完成了) 第二 我想要做的是,如果键不存在,我想添加键和值可以为 null 没关系(因为它不会在选择查询时给我错误)

我已经尝试过 Functions.lit() 也尝试了很多其他的东西使用列数组追加等,但它没有工作

我的 sn-p 示例是

示例 JSON 格式


    "participants": [
            "flaggedR": "null",
            "participantId": "80d-1961-4e85",
            "participantName": "XYZ",
            "purpose": "external"
        ,
        
            "flaggedR": "null",
            "participantId": "909ba80d-1961",
            "participantName": "ABC",
            "purpose": "external"
        
    ]

首先,我在推断架构后对其进行了爆炸 df.select(explode('participants').alias('p')) 现在 我必须找到 IsWrap-Up 密钥存在于数组中 我有它使用 使用完成 df.schema.simpleString().find("IsWrap-Up") (如果不存在则返回 -1 或如果存在则返回索引) 现在如果它返回 -1 我想在数组中添加键“IsWrap-Up”。 我试过使用 f.lit() array_contacte 或使用火花的附加方法但它没有

【问题讨论】:

嗨@Shubham,如果答案对您有帮助,您可以接受它作为答案(单击答案旁边的复选标记将其从灰色切换为已填充。)。这对其他社区成员可能是有益的。谢谢。 【参考方案1】:

spark>=2.4

试试这个 -

加载提供的测试数据

 val data =
      """
        |
        |    "participants": [
        |            "flaggedR": "null",
        |            "participantId": "80d-1961-4e85",
        |            "participantName": "XYZ",
        |            "purpose": "external"
        |        ,
        |        
        |            "flaggedR": "null",
        |            "participantId": "909ba80d-1961",
        |            "participantName": "ABC",
        |            "purpose": "external"
        |        
        |    ]
        |
      """.stripMargin
    val df = spark.read
      .option("multiLine", true)
      .json(Seq(data).toDS())
    df.show(false)
    df.printSchema()
    /**
      * +----------------------------------------------------------------------------+
      * |participants                                                                |
      * +----------------------------------------------------------------------------+
      * |[[null, 80d-1961-4e85, XYZ, external], [null, 909ba80d-1961, ABC, external]]|
      * +----------------------------------------------------------------------------+
      *
      * root
      * |-- participants: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- flaggedR: string (nullable = true)
      * |    |    |-- participantId: string (nullable = true)
      * |    |    |-- participantName: string (nullable = true)
      * |    |    |-- purpose: string (nullable = true)
      */

在每个结构元素中添加IsWrap-Up

      
    val p = df.withColumn("participants", expr("TRANSFORM(participants, " +
      "x ->named_struct('flaggedR', x.flaggedR, 'participantId', x.participantId," +
      " 'participantName', x.participantName, 'purpose',  x.purpose, 'IsWrap-Up', null))"))
    p.show(false)
    p.printSchema()

    /**
      * +------------------------------------------------------------------------------+
      * |participants                                                                  |
      * +------------------------------------------------------------------------------+
      * |[[null, 80d-1961-4e85, XYZ, external,], [null, 909ba80d-1961, ABC, external,]]|
      * +------------------------------------------------------------------------------+
      *
      * root
      * |-- participants: array (nullable = true)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- flaggedR: string (nullable = true)
      * |    |    |-- participantId: string (nullable = true)
      * |    |    |-- participantName: string (nullable = true)
      * |    |    |-- purpose: string (nullable = true)
      * |    |    |-- IsWrap-Up: null (nullable = true)
      */

【讨论】:

感谢 Someshwar 的回复,看起来很干净,让我试着回复你:)

以上是关于想将key添加到pyspark dataFrame的爆炸数组中的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:当列是列表时,将列添加到 DataFrame

如何将标头添加到 PySpark DataFrame?

如何将字典中的多个值添加到 PySpark Dataframe

将具有最接近值的列添加到 PySpark Dataframe

使用 pyspark 将 json 文件读入 RDD(不是 dataFrame)

Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `