如何在 PySpark 中使用自定义行分组来 reduceByKey?

Posted

技术标签:

【中文标题】如何在 PySpark 中使用自定义行分组来 reduceByKey?【英文标题】:How to reduceByKey in PySpark with custom grouping of rows? 【发布时间】:2019-05-22 09:31:28 【问题描述】:

我有一个如下所示的数据框:

items_df
======================================================
| customer   item_type    brand    price    quantity |  
|====================================================|
|  1         bread        reems     20         10    |  
|  2         butter       spencers  10         21    |  
|  3         jam          niles     10         22    |
|  1         bread        marks     16         18    |
|  1         butter       jims      19         12    |
|  1         jam          jills     16         6     |
|  2         bread        marks     16         18    |
======================================================

我创建了一个 rdd,将上面的内容转换为 dict:

rdd = items_df.rdd.map(lambda row: row.asDict())

结果如下:

[
    "customer": 1, "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10 ,
    "customer": 2, "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21 ,
    "customer": 3, "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22 ,
    "customer": 1, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 ,
    "customer": 1, "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12 ,
    "customer": 1, "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6 ,
    "customer": 2, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 
]

我想先按客户对上述行进行分组。然后我想介绍自定义的新键“面包”、“黄油”、“果酱”,并为该客户对所有这些行进行分组。所以我的 rdd 从 7 行减少到 3 行。

输出如下所示:

[
     
        "customer": 1, 
        "breads": [
            "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10,
            "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18,
        ],
        "butters": [
            "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12
        ],
        "jams": [
            "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6
        ]
    ,
    
        "customer": 2,
        "breads": [
            "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18
        ],
        "butters": [
            "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21
        ],
        "jams": []
    ,
    
        "customer": 3,
        "breads": [],
        "butters": [],
        "jams": [
            "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22
        ]
    
]

有人知道如何使用 PySpark 实现上述目标吗?我想知道是否有使用 reduceByKey() 或类似方法的解决方案。如果可能,我希望避免使用 groupByKey()。

【问题讨论】:

【参考方案1】:

首先添加一列item_types 来透视数据框。

items_df = items_df.withColumn('item_types', F.concat(F.col('item_type'),F.lit('s')))
items_df.show()

+--------+---------+--------+-----+--------+----------+
|customer|item_type|   brand|price|quantity|item_types|
+--------+---------+--------+-----+--------+----------+
|       1|    bread|   reems|   20|      10|    breads|
|       2|   butter|spencers|   10|      21|   butters|
|       3|      jam|   niles|   10|      22|      jams|
|       1|    bread|   marks|   16|      18|    breads|
|       1|   butter|    jims|   19|      12|   butters|
|       1|      jam|   jills|   16|       6|      jams|
|       2|    bread|   marks|   16|      18|    breads|
+--------+---------+--------+-----+--------+----------+

然后您可以使用customer 组对表进行透视,并同时使用F.collect_list() 聚合其他列。

items_df = items_df.groupby(['customer']).pivot("item_types").agg(
    F.collect_list(F.struct(F.col("item_type"),F.col("brand"), F.col("price"),F.col("quantity")))
).sort('customer')
items_df.show()

+--------+--------------------+--------------------+--------------------+
|customer|              breads|             butters|                jams|
+--------+--------------------+--------------------+--------------------+
|       1|[[bread, reems, 2...|[[butter, jims, 1...|[[jam, jills, 16,...|
|       2|[[bread, marks, 1...|[[butter, spencer...|                  []|
|       3|                  []|                  []|[[jam, niles, 10,...|
+--------+--------------------+--------------------+--------------------+

最后你需要设置recursive=True 将嵌套的Row 转换为dict。

rdd = items_df.rdd.map(lambda row: row.asDict(recursive=True))
print(rdd.take(10))


['customer': 1,
  'breads': ['item_type': u'bread', 'brand': u'reems', 'price': 20, 'quantity': 10,
             'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18],
  'butters': ['item_type': u'butter', 'brand': u'jims', 'price': 19, 'quantity': 12],
  'jams': ['item_type': u'jam', 'brand': u'jills', 'price': 16, 'quantity': 6],
 'customer': 2,
  'breads': ['item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18],
  'butters': ['item_type': u'butter', 'brand': u'spencers', 'price': 10, 'quantity': 21],
  'jams': [],
 'customer': 3,
  'breads': [],
  'butters': [],
  'jams': ['item_type': u'jam', 'brand': u'niles', 'price': 10, 'quantity': 22]]

【讨论】:

非常感谢@giser_yugang!这是一个很好的解释【参考方案2】:

我还使用了另一种方法,在 rdd 中使用了 reduceByKey()。给定数据框 items_df,首先将其转换为 rdd:

rdd = items_df.rdd.map(lambda row: row.asDict())

将每一行转换为具有元组 (customer, [row_obj]),其中我们有 row_obj 在列表中:

rdd = rdd.map(lambda row: ( row["customer"], [row] ) )

使用 reduceByKey 按客户分组,其中列表为给定客户连接:

rdd = rdd.reduceByKey(lambda x,y: x+y)

将元组转换回字典,其中键是客户,值是所有关联行的列表:

rdd = rdd.map(lambda tup:  tup[0]: tup[1]  )

由于每个客户数据现在都是连续的,我们可以使用自定义函数将数据分离为面包、黄油、果酱:

def organize_items_in_customer(row):
    cust_id = list(row.keys())[0]
    items = row[cust_id]
    new_cust_obj =  "customer": cust_id, "breads": [], "butters": [], "jams": [] 
    plurals =  "bread":"breads", "butter":"butters", "jam":"jams" 
    for item in items:
        item_type = item["item_type"]
        key = plurals[item_type]
        new_cust_obj[key].append(item)
    return new_cust_obj

调用上述函数转换rdd:

rdd = rdd.map(organize_items_in_customer)

【讨论】:

以上是关于如何在 PySpark 中使用自定义行分组来 reduceByKey?的主要内容,如果未能解决你的问题,请参考以下文章

如何为 ExtJS GridPanel 实现自定义行排序

如何从 PrepareForReuse 的 UITableView 自定义行中删除视图?

如何在 androidleaback 库中自定义行的标题项?

Pyspark 使用自定义函数

如何在 SwiftUI 中结合可折叠行使用 List 自定义行内部内容?

Pig 使用自定义行/记录分隔符存储文件