使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth

Posted

技术标签:

【中文标题】使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth【英文标题】:Use groupby or aggregate to merge items in each transaction in RDD or DataFrame to do FP-growth 【发布时间】:2017-08-08 09:46:46 【问题描述】:

我想将具有这种结构的数据框更改为第二个。

+---+-----+-----+
| id|order|items|
+---+-----+-----+
|  0|    a|    1|
|  1|    a|    2|
|  2|    a|    5|
|  3|    b|    1|
|  4|    b|    2|
|  5|    b|    3|
|  6|    b|    5|
|  7|    c|    1|
|  8|    c|    2|
+---+-----+-----+

改成这样:

+---+-----+------------+
| id|order|       items|
+---+-----+------------+
|  0|    a|   [1, 2, 5]|
|  1|    b|[1, 2, 3, 5]|
|  2|    c|      [1, 2]|
+---+-----+------------+

如何在 PySpark 中做到这一点?

【问题讨论】:

【参考方案1】:

你可以的

from pyspark.sql.functions import *
df.groupBy(df.order).agg(collect_list("items").alias("items"))

已编辑

如果你想在 rdd 中做同样的事情,你可以执行以下操作(scala)

rdd.groupBy(x => x._2).mapValues(x => x.map(y => y._3)).zipWithIndex()

给定rdd为

(0,a,1)
(1,a,2)
(2,a,5)
(3,b,1)
(4,b,2)
(5,b,3)
(6,b,5)
(7,c,1)
(8,c,2)

结果是

((a,List(1, 2, 5)),0)
((b,List(1, 2, 3, 5)),1)
((c,List(1, 2)),2)

【讨论】:

【参考方案2】:

Groupby 带有collect_list 函数的订单和带有row_number 的唯一ID 应该适用于您的情况

from pyspark.sql import functions as F
df.groupBy("order").agg(F.collect_list("items"))
   .withColumn("id", F.row_number().over(Window.orderBy("order")))

希望这会有所帮助!

【讨论】:

以上是关于使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth的主要内容,如果未能解决你的问题,请参考以下文章

在 SQL 中使用 Group By 和 Aggregate - 获取错误“选择列表中的列无效,因为它不包含在聚合函数或 GROUP BY 中”

提高性能(矢量化?) pandas.groupby.aggregate

SQL --------------- GROUP BY 函数

Pandas GroupBy.agg() 抛出 TypeError: aggregate() 缺少 1 个必需的位置参数:'arg'

Groupby 和 Aggregate 以列表为元素的 pandas 列,并在列表中获取唯一值

pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)