如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?
Posted
技术标签:
【中文标题】如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?【英文标题】:How to groupby a data frame in pyspark by a column and get a dictionary with that column as key and list of records as its value? 【发布时间】:2021-04-08 18:21:54 【问题描述】:我有一个这样的数据框 -
-RECORD 0-------------------------------------------
id | 11
order_number | 254
order_date | 2021-03-09
store_id | abc6
employee_code | 6921_abc40
customer_name | harvey
contact_number | 353
address | foo
locality | foo
postal_code | 5600082332
order_info | info
amount | 478.8
payment_type | null
timeA | 2021-03-10 01:34:26
timeB | 2021-03-10 01:35:26
-RECORD 1-------------------------------------------
id | 12
order_number | 2272
order_date | 2021-03-09
store_id | abc666
employee_code | 66_abc55
customer_name | mike
contact_number | 98
address | bar
locality | bar
postal_code | 11000734332
order_info | info
amount_to_be_collected | 0.34
payment_type | null
timeA | 2021-03-10 00:18:04
timeB | 2021-03-10 03:21:06
我想做以下事情-
按employee_code 对记录进行分组并得到一个字典作为回报,这将是这样的 -
"emp_code": [Record0, Record1, ....]
即,员工代码作为键,该员工的所有记录列表作为值。
我正在为此编写一个 Gluejob。我可以通过循环遍历所有记录并获取所需的字典以编程方式完成此操作,但这将花费大量时间。我想知道是否有办法通过使用一些更高阶的pyspark函数来实现这个结果?
【问题讨论】:
【参考方案1】:使用地图
您可以创建一个映射,该映射具有基于employee_code
的键和一个结构或数组作为值:
df = df.select(map(col("employee_code"), struct("order_number", "order_date",,)).alias("complex_map"))
然后可以使用selectExpr
作为地图查询:
df.selectExpr("complex_map['employee_code']").show(2)
结构的替代方案:
为此,您需要在 grouping by
它们之前对 ComplexTypes 进行一些转换,这基本上将结构从...转换为:
DataFrame[order_number: string, employee_code: string, ....]>
变成这样:
DataFrame[employee_code: string, complex: struct<order_number:string,contact_number:int>]>
这可以使用from pyspark.sql.functions import struct
中的struct
函数来完成:
from pyspark.sql.functions import struct
df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders"))
一旦你将它们放在那种结构中,你就可以执行分组并使用聚合函数 collect_list:
from pyspark.sql.functions import struct, collect_list
df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders")).groupBy("employee_code").agg(collect_list("orders").alias("orders")
然后您可以在结构中选择单个列:
df.select(col("orders.order_number"))
甚至通过以下方式过滤它们:
df.select(col("employee_code")).where(col("orders.order_number") > 100)
如果您想回到原来的状态,请查看 explode
函数,该函数采用一列数组并创建一行(其余值重复)
【讨论】:
您好,如果我的记录有大约 100 列,那么在使用 struct 函数时,我是否必须在执行此操作时明确提及所有列的名称? 然后可以使用selectExpr("select *) api以上是关于如何按列对pyspark中的数据框进行分组并以该列作为键并以记录列表作为其值来获取字典?的主要内容,如果未能解决你的问题,请参考以下文章
使用 NaN 在 pandas 中按列对数据进行 Winsorizing