如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?

Posted

技术标签:

【中文标题】如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?【英文标题】:How to groupby a data frame in pyspark by a column and get a dictionary with that column as key and list of records as its value? 【发布时间】:2021-04-08 18:21:54 【问题描述】:

我有一个这样的数据框 -

-RECORD 0-------------------------------------------

 id                          | 11           
 order_number                | 254                  
 order_date                  | 2021-03-09           
 store_id                    | abc6            
 employee_code               | 6921_abc40    
 customer_name               | harvey 
 contact_number              | 353          
 address                     | foo 
 locality                    | foo               
 postal_code                 | 5600082332             
 order_info                  | info
 amount                      | 478.8                
 payment_type                | null                 
 timeA                       | 2021-03-10 01:34:26
 timeB                       | 2021-03-10 01:35:26  
             
-RECORD 1-------------------------------------------

 id                          | 12            
 order_number                | 2272                 
 order_date                  | 2021-03-09           
 store_id                    | abc666             
 employee_code               | 66_abc55               
 customer_name               | mike        
 contact_number              | 98          
 address                     | bar
 locality                    | bar
 postal_code                 | 11000734332              
 order_info                  | info
 amount_to_be_collected      | 0.34                 
 payment_type                | null                 
 timeA                       | 2021-03-10 00:18:04  
 timeB                       | 2021-03-10 03:21:06  
 
  

我想做以下事情-

按employee_code 对记录进行分组并得到一个字典作为回报,这将是这样的 -

"emp_code": [Record0, Record1, ....]

即,员工代码作为键,该员工的所有记录列表作为值。

我正在为此编写一个 Gluejob。我可以通过循环遍历所有记录并获取所需的字典以编程方式完成此操作,但这将花费大量时间。我想知道是否有办法通过使用一些更高阶的pyspark函数来实现这个结果?

【问题讨论】:

【参考方案1】:

使用地图

您可以创建一个映射,该映射具有基于employee_code 的键和一个结构或数组作为值:

df = df.select(map(col("employee_code"), struct("order_number", "order_date",,)).alias("complex_map"))

然后可以使用selectExpr作为地图查询:

df.selectExpr("complex_map['employee_code']").show(2)

结构的替代方案:

为此,您需要在 grouping by 它们之前对 ComplexTypes 进行一些转换,这基本上将结构从...转换为:

DataFrame[order_number: string, employee_code: string, ....]>

变成这样:

DataFrame[employee_code: string, complex: struct<order_number:string,contact_number:int>]>

这可以使用from pyspark.sql.functions import struct 中的struct 函数来完成:

from pyspark.sql.functions import struct

df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders"))

一旦你将它们放在那种结构中,你就可以执行分组并使用聚合函数 collect_list:

from pyspark.sql.functions import struct, collect_list

df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders")).groupBy("employee_code").agg(collect_list("orders").alias("orders")

然后您可以在结构中选择单个列:

df.select(col("orders.order_number"))

甚至通过以下方式过滤它们:

df.select(col("employee_code")).where(col("orders.order_number") > 100)

如果您想回到原来的状态,请查看 explode 函数,该函数采用一列数组并创建一行(其余值重复)

【讨论】:

您好,如果我的记录有大约 100 列,那么在使用 struct 函数时,我是否必须在执行此操作时明确提及所有列的名称? 然后可以使用selectExpr("select *) api

以上是关于如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?的主要内容,如果未能解决你的问题,请参考以下文章

按列对分组数据帧进行采样

Python - 读取 csv 并按列对数据进行分组

使用 NaN 在 pandas 中按列对数据进行 Winsorizing

php 按列对SQL结果进行分组

如何在 PySpark 中进行分组并查找列的唯一项目 [重复]

PySpark Python使用列对数据框进行排序