如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换

Posted

技术标签:

【中文标题】如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换【英文标题】:How to apply group by on pyspark dataframe and a transformation on the resulting object 【发布时间】:2020-06-24 14:02:13 【问题描述】:

我有一个 spark 数据框

| item_id | attribute_key| attribute_value
____________________________________________________________________________
| id_1        brand          Samsung
| id_1        ram            6GB
| id_2        brand          Apple
| id_2        ram            4GB
_____________________________________________________________________________

我想将此数据框按item_id 分组并输出为一个文件,其中每一行都是一个json 对象

id_1: "properties":["brand":['Samsung'],"ram":['6GB'] ]
id_2: "properties":["brand":['Apple'],"ram":['4GB'] ]

这是一个大的分布式数据框,因此不能转换为 pandas。 这种转换在 pyspark 中是否可行

【问题讨论】:

【参考方案1】:

在scala中,但是python版本会很相似(sql.functions):

val df = Seq((1,"brand","Samsung"),(1,"ram","6GB"),(1,"ram","8GB"),(2,"brand","Apple"),(2,"ram","6GB")).toDF("item_id","attribute_key","attribute_value")

+-------+-------------+---------------+
|item_id|attribute_key|attribute_value|
+-------+-------------+---------------+
|      1|        brand|        Samsung|
|      1|          ram|            6GB|
|      1|          ram|            8GB|
|      2|        brand|          Apple|
|      2|          ram|            6GB|
+-------+-------------+---------------+

df.groupBy('item_id,'attribute_key)
.agg(collect_list('attribute_value).as("list2"))
.groupBy('item_id)
.agg(map(lit("properties"),collect_list(map('attribute_key,'list2))).as("prop"))
.select(to_json(map('item_id,'prop)).as("json"))
.show(false)

输出:

+------------------------------------------------------------------+
|json                                                              |
+------------------------------------------------------------------+
|"1":"properties":["ram":["6GB","8GB"],"brand":["Samsung"]]|
|"2":"properties":["brand":["Apple"],"ram":["6GB"]]        |
+------------------------------------------------------------------+

【讨论】:

以上是关于如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中的数据帧上应用 udf 后出错

如何在 pyspark 中的数据帧上使用 fuzz.ratio

pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值

在 pyspark 数据帧上减少和 Lambda

pyspark 数据帧上的向量操作

数据帧上的 spark GROUPED_MAP udf 是不是并行运行?