Pyspark Json 结构

Posted

技术标签:

【中文标题】Pyspark Json 结构【英文标题】:Pyspark Json Struct 【发布时间】:2022-01-09 02:37:58 【问题描述】:

我正在尝试从 DF 创建 1 个 json,其中 1 个客户有 3 个条目,


+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+
|CustomerId|EmailPreference|EmailType|AddressPreference|AddressType|PhonePreference|PhoneType|        attribute|                from|                to|action|
+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+
|C1000001|        Primary|     Home|             null|       null|           null|     null|     EmailAddress|TEST@Solutions.com|WELL@Solutions.com|UPDATE|
|C1000001|           null|     null|             null|       null|        Primary|     Home|      PhoneNumber|          8177777777|        8168888888|UPDATE|
|C1000001|           null|     null|             null|       null|        Primary|     Home|FormatPhoneNumber|       (816)777-7777|     (816)888-8888|UPDATE|
+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+

这是我们的更新DF。因此,我试图以这样一种方式创建一个结构,即我们需要 1 个客户的 1 个 json 条目。因此,对于 1 位客户,这里有 3 次更新。所以这就是我尝试过的,

json_df = updatesDF.select(
      F.col("CustomerId").alias("CustomerId"),
   #   "action",
      "PhonePreference",
      "EmailPreference",
      
      F.struct(
        F.col("PhoneType"),
        F.col("PhonePreference"),
        F.col("Attribute"),
        F.col("From"),
        F.col("To"),
      ).alias("PhoneDetails"),

      F.struct(
        F.col("EmailType"),
        F.col("EmailPreference"),  
        F.col("Attribute"),
        F.col("From"),
        F.col("To"),
        ).alias("EmailDetails"),
    ).groupBy(
    "CustomerId",
      "PhonePreference",
    "EmailPreference",
    ).agg(
    F.collect_list("PhoneDetails").alias("PhoneDetails"),
    F.collect_list("EmailDetails").alias("EmailDetails"),)

所以这段代码的问题是,我得到 2 个 jsons 作为输出,我需要一个输出给 1 个客户

能否请您帮忙,以便最终我得到 1 个 json,其中包含 emaildetails 下的所有电子邮件更改以及 phonedetails 下的所有电话更改

【问题讨论】:

能否添加创建示例数据框的代码? 请修改您的帖子标题以提出明确、具体的问题。见How to Ask。 【参考方案1】:

您得到 2 行,因为您的 groupBy 有两个独特的组合。

(C1000001, null, Primary) (C1000001, Primary, null)

一种解决方法是您只使用groupBy CustomerId 并在聚合中应用first("EmailPreference", ignorenulls=True)first("PhonePreference", ignorenulls=True)

【讨论】:

【参考方案2】:

您已经完成了一半,您所要做的就是再分组一次,仅使用 CustomerID,并将 PhoneDetailsEmailDetails 添加到另一个结构中

    .select(
        'CustomerId',
        F.struct(
            F.col('PhoneDetails'),
            F.col('EmailDetails')
        ).alias('updates')
    )
    .groupBy('CustomerId')
    .agg(F.collect_list('updates').alias('updates'))

【讨论】:

以上是关于Pyspark Json 结构的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 处理 JSON 结构

Pyspark 将嵌套结构字段转换为 Json 字符串

使用 pyspark 从数据框创建 json 结构

kafka 到 pyspark 结构化流,将 json 解析为数据帧

如何在pyspark上更改JSON结构?

如何在pyspark中将rdd行转换为带有json结构的数据框?