使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth
Posted
技术标签:
【中文标题】使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth【英文标题】:Use groupby or aggregate to merge items in each transaction in RDD or DataFrame to do FP-growth 【发布时间】:2017-08-08 09:46:46 【问题描述】:我想将具有这种结构的数据框更改为第二个。
+---+-----+-----+
| id|order|items|
+---+-----+-----+
| 0| a| 1|
| 1| a| 2|
| 2| a| 5|
| 3| b| 1|
| 4| b| 2|
| 5| b| 3|
| 6| b| 5|
| 7| c| 1|
| 8| c| 2|
+---+-----+-----+
改成这样:
+---+-----+------------+
| id|order| items|
+---+-----+------------+
| 0| a| [1, 2, 5]|
| 1| b|[1, 2, 3, 5]|
| 2| c| [1, 2]|
+---+-----+------------+
如何在 PySpark 中做到这一点?
【问题讨论】:
【参考方案1】:你可以的
from pyspark.sql.functions import *
df.groupBy(df.order).agg(collect_list("items").alias("items"))
已编辑
如果你想在 rdd 中做同样的事情,你可以执行以下操作(scala)
rdd.groupBy(x => x._2).mapValues(x => x.map(y => y._3)).zipWithIndex()
给定rdd为
(0,a,1)
(1,a,2)
(2,a,5)
(3,b,1)
(4,b,2)
(5,b,3)
(6,b,5)
(7,c,1)
(8,c,2)
结果是
((a,List(1, 2, 5)),0)
((b,List(1, 2, 3, 5)),1)
((c,List(1, 2)),2)
【讨论】:
【参考方案2】:Groupby
带有collect_list
函数的订单和带有row_number
的唯一ID 应该适用于您的情况
from pyspark.sql import functions as F
df.groupBy("order").agg(F.collect_list("items"))
.withColumn("id", F.row_number().over(Window.orderBy("order")))
希望这会有所帮助!
【讨论】:
以上是关于使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目以进行 FP-growth的主要内容,如果未能解决你的问题,请参考以下文章
在 SQL 中使用 Group By 和 Aggregate - 获取错误“选择列表中的列无效,因为它不包含在聚合函数或 GROUP BY 中”
提高性能(矢量化?) pandas.groupby.aggregate
SQL --------------- GROUP BY 函数
Pandas GroupBy.agg() 抛出 TypeError: aggregate() 缺少 1 个必需的位置参数:'arg'
Groupby 和 Aggregate 以列表为元素的 pandas 列,并在列表中获取唯一值
pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)