如何将一个RDD拆分成多个RDD并相互比较

Posted

技术标签:

【中文标题】如何将一个RDD拆分成多个RDD并相互比较【英文标题】:How to split one RDD into multiple RDD and compare each other 【发布时间】:2018-07-29 07:35:51 【问题描述】:

我有如下的主 RDD

 [(u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 22'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 22'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 23'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 24'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 25'),
 (u'Facebook', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 26'),
 (u'Facebook', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 27'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 28'),
 (u'google', u'00e293fd-80df-47c2-8762-b96c0b5e71c5', 'week 29'),
 (u'Facebook', u'0532e64a-7163-46a1-92ba-286b2a47bed5', 'week 30')]

我想根据第三列(按周索引)拆分它们以进行同期群分析。我想到的一种方法是将RDD转换为基于星期列的数据框和分区并将其保存在文本文件中并单独阅读并相互比较。有没有更好的办法?

【问题讨论】:

你想进行什么样的比较? 可以说,100 人在第一周从谷歌来源访问了网站。我想看看第 1 周有多少来自 google 来源的人在第 2 周、第 3 周等中被保留。 如果您想使用不同的 RDD,您可以使用带有返回值键值对的映射转换。之后,使用查找功能,您可以获得所需键的 RDD。但对于你的结果,我想,它没有用。您应该创建一个数据框并在此数据框上按 SQL 运行组。也许在 SQL 之后,您可以按周列划分。 【参考方案1】:

我将RDD转换为Dataframe并使用以下代码按源保存到分区

cohort_df = cohort.toDF(["source", "userId", "cohortId"])
cohort_df.write.partitionBy("source").csv("cohorts")

而且,我通过如下函数过滤将 RDD 分成数周

def week24(row):
    return "week24" == row[1]

week22_rdd, week23_rdd, week24_rdd, week25_rdd, week26_rdd = (cohortGroup_rdd.filter(f).map(lambda f: f[0]).distinct() for f in (week22, week23, week24, week25, week26))

并且,使用两个RDD的交集来查找比较两个RDD并获得共同的ID,如下所示:

new_rdd = week22_rdd.intersection(week23_rdd)

这是让我前进的最佳选择。

【讨论】:

【参考方案2】:

当这是所有需要处理的信息时,它有点难以提供帮助,但您始终可以创建一个数据框和按周执行分组或聚合操作。和创建 SQL 查询进行比较。它适用于您在 cmets 中给出的示例。

【讨论】:

以上是关于如何将一个RDD拆分成多个RDD并相互比较的主要内容,如果未能解决你的问题,请参考以下文章

将一个 RDD 拆分为多个 RDDS

火花。将 RDD 拆分为批次

Spark中将一个RDD严格划分为多个RDD

将 DataFrame 转换为 RDD 并将 RDD 动态拆分为与 DataFrame 相同数量的 Columns

如何将三个 RDD 加入一个元组?

如何通过 Delimiter 拆分 Spark RDD 的行