Pyspark Json 结构
Posted
技术标签:
【中文标题】Pyspark Json 结构【英文标题】:Pyspark Json Struct 【发布时间】:2022-01-09 02:37:58 【问题描述】:我正在尝试从 DF 创建 1 个 json,其中 1 个客户有 3 个条目,
+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+
|CustomerId|EmailPreference|EmailType|AddressPreference|AddressType|PhonePreference|PhoneType| attribute| from| to|action|
+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+
|C1000001| Primary| Home| null| null| null| null| EmailAddress|TEST@Solutions.com|WELL@Solutions.com|UPDATE|
|C1000001| null| null| null| null| Primary| Home| PhoneNumber| 8177777777| 8168888888|UPDATE|
|C1000001| null| null| null| null| Primary| Home|FormatPhoneNumber| (816)777-7777| (816)888-8888|UPDATE|
+----------+---------------+---------+-----------------+-----------+---------------+---------+-----------------+--------------------+------------------+------+
这是我们的更新DF。因此,我试图以这样一种方式创建一个结构,即我们需要 1 个客户的 1 个 json 条目。因此,对于 1 位客户,这里有 3 次更新。所以这就是我尝试过的,
json_df = updatesDF.select(
F.col("CustomerId").alias("CustomerId"),
# "action",
"PhonePreference",
"EmailPreference",
F.struct(
F.col("PhoneType"),
F.col("PhonePreference"),
F.col("Attribute"),
F.col("From"),
F.col("To"),
).alias("PhoneDetails"),
F.struct(
F.col("EmailType"),
F.col("EmailPreference"),
F.col("Attribute"),
F.col("From"),
F.col("To"),
).alias("EmailDetails"),
).groupBy(
"CustomerId",
"PhonePreference",
"EmailPreference",
).agg(
F.collect_list("PhoneDetails").alias("PhoneDetails"),
F.collect_list("EmailDetails").alias("EmailDetails"),)
所以这段代码的问题是,我得到 2 个 jsons 作为输出,我需要一个输出给 1 个客户
能否请您帮忙,以便最终我得到 1 个 json,其中包含 emaildetails 下的所有电子邮件更改以及 phonedetails 下的所有电话更改
【问题讨论】:
能否添加创建示例数据框的代码? 请修改您的帖子标题以提出明确、具体的问题。见How to Ask。 【参考方案1】:您得到 2 行,因为您的 groupBy
有两个独特的组合。
(C1000001, null, Primary)
(C1000001, Primary, null)
一种解决方法是您只使用groupBy
CustomerId
并在聚合中应用first("EmailPreference", ignorenulls=True)
和first("PhonePreference", ignorenulls=True)
【讨论】:
【参考方案2】:您已经完成了一半,您所要做的就是再分组一次,仅使用 CustomerID
,并将 PhoneDetails
和 EmailDetails
添加到另一个结构中
.select(
'CustomerId',
F.struct(
F.col('PhoneDetails'),
F.col('EmailDetails')
).alias('updates')
)
.groupBy('CustomerId')
.agg(F.collect_list('updates').alias('updates'))
【讨论】:
以上是关于Pyspark Json 结构的主要内容,如果未能解决你的问题,请参考以下文章