如何在 PySpark 中使用自定义行分组来 reduceByKey?
Posted
技术标签:
【中文标题】如何在 PySpark 中使用自定义行分组来 reduceByKey?【英文标题】:How to reduceByKey in PySpark with custom grouping of rows? 【发布时间】:2019-05-22 09:31:28 【问题描述】:我有一个如下所示的数据框:
items_df
======================================================
| customer item_type brand price quantity |
|====================================================|
| 1 bread reems 20 10 |
| 2 butter spencers 10 21 |
| 3 jam niles 10 22 |
| 1 bread marks 16 18 |
| 1 butter jims 19 12 |
| 1 jam jills 16 6 |
| 2 bread marks 16 18 |
======================================================
我创建了一个 rdd,将上面的内容转换为 dict:
rdd = items_df.rdd.map(lambda row: row.asDict())
结果如下:
[
"customer": 1, "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10 ,
"customer": 2, "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21 ,
"customer": 3, "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22 ,
"customer": 1, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 ,
"customer": 1, "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12 ,
"customer": 1, "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6 ,
"customer": 2, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18
]
我想先按客户对上述行进行分组。然后我想介绍自定义的新键“面包”、“黄油”、“果酱”,并为该客户对所有这些行进行分组。所以我的 rdd 从 7 行减少到 3 行。
输出如下所示:
[
"customer": 1,
"breads": [
"item_type": "bread", "brand": "reems", "price": 20, "quantity": 10,
"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18,
],
"butters": [
"item_type": "butter", "brand": "jims", "price": 19, "quantity": 12
],
"jams": [
"item_type": "jam", "brand": "jills", "price": 16, "quantity": 6
]
,
"customer": 2,
"breads": [
"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18
],
"butters": [
"item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21
],
"jams": []
,
"customer": 3,
"breads": [],
"butters": [],
"jams": [
"item_type": "jam", "brand": "niles", "price": 10, "quantity": 22
]
]
有人知道如何使用 PySpark 实现上述目标吗?我想知道是否有使用 reduceByKey() 或类似方法的解决方案。如果可能,我希望避免使用 groupByKey()。
【问题讨论】:
【参考方案1】:首先添加一列item_types
来透视数据框。
items_df = items_df.withColumn('item_types', F.concat(F.col('item_type'),F.lit('s')))
items_df.show()
+--------+---------+--------+-----+--------+----------+
|customer|item_type| brand|price|quantity|item_types|
+--------+---------+--------+-----+--------+----------+
| 1| bread| reems| 20| 10| breads|
| 2| butter|spencers| 10| 21| butters|
| 3| jam| niles| 10| 22| jams|
| 1| bread| marks| 16| 18| breads|
| 1| butter| jims| 19| 12| butters|
| 1| jam| jills| 16| 6| jams|
| 2| bread| marks| 16| 18| breads|
+--------+---------+--------+-----+--------+----------+
然后您可以使用customer
组对表进行透视,并同时使用F.collect_list()
聚合其他列。
items_df = items_df.groupby(['customer']).pivot("item_types").agg(
F.collect_list(F.struct(F.col("item_type"),F.col("brand"), F.col("price"),F.col("quantity")))
).sort('customer')
items_df.show()
+--------+--------------------+--------------------+--------------------+
|customer| breads| butters| jams|
+--------+--------------------+--------------------+--------------------+
| 1|[[bread, reems, 2...|[[butter, jims, 1...|[[jam, jills, 16,...|
| 2|[[bread, marks, 1...|[[butter, spencer...| []|
| 3| []| []|[[jam, niles, 10,...|
+--------+--------------------+--------------------+--------------------+
最后你需要设置recursive=True
将嵌套的Row 转换为dict。
rdd = items_df.rdd.map(lambda row: row.asDict(recursive=True))
print(rdd.take(10))
['customer': 1,
'breads': ['item_type': u'bread', 'brand': u'reems', 'price': 20, 'quantity': 10,
'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18],
'butters': ['item_type': u'butter', 'brand': u'jims', 'price': 19, 'quantity': 12],
'jams': ['item_type': u'jam', 'brand': u'jills', 'price': 16, 'quantity': 6],
'customer': 2,
'breads': ['item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18],
'butters': ['item_type': u'butter', 'brand': u'spencers', 'price': 10, 'quantity': 21],
'jams': [],
'customer': 3,
'breads': [],
'butters': [],
'jams': ['item_type': u'jam', 'brand': u'niles', 'price': 10, 'quantity': 22]]
【讨论】:
非常感谢@giser_yugang!这是一个很好的解释【参考方案2】:我还使用了另一种方法,在 rdd 中使用了 reduceByKey()。给定数据框 items_df,首先将其转换为 rdd:
rdd = items_df.rdd.map(lambda row: row.asDict())
将每一行转换为具有元组 (customer, [row_obj]),其中我们有 row_obj 在列表中:
rdd = rdd.map(lambda row: ( row["customer"], [row] ) )
使用 reduceByKey 按客户分组,其中列表为给定客户连接:
rdd = rdd.reduceByKey(lambda x,y: x+y)
将元组转换回字典,其中键是客户,值是所有关联行的列表:
rdd = rdd.map(lambda tup: tup[0]: tup[1] )
由于每个客户数据现在都是连续的,我们可以使用自定义函数将数据分离为面包、黄油、果酱:
def organize_items_in_customer(row):
cust_id = list(row.keys())[0]
items = row[cust_id]
new_cust_obj = "customer": cust_id, "breads": [], "butters": [], "jams": []
plurals = "bread":"breads", "butter":"butters", "jam":"jams"
for item in items:
item_type = item["item_type"]
key = plurals[item_type]
new_cust_obj[key].append(item)
return new_cust_obj
调用上述函数转换rdd:
rdd = rdd.map(organize_items_in_customer)
【讨论】:
以上是关于如何在 PySpark 中使用自定义行分组来 reduceByKey?的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PrepareForReuse 的 UITableView 自定义行中删除视图?
如何在 androidleaback 库中自定义行的标题项?