使用 Spark 进行 RDD 和元组操作

Posted

技术标签:

【中文标题】使用 Spark 进行 RDD 和元组操作【英文标题】:RDD and tuple manipulation with Spark 【发布时间】:2018-08-09 09:45:43 【问题描述】:

我刚开始使用 Spark,但遇到一个问题,我真的不知道如何解决。

我的输入是如下形式的 RDD:

[(u'7362', (u'2016-06-29 09:58:35', 0)), (u'8600', (u'2016-06-29 20:47:27', 1)), (u'f9f8', (u'2016-07-01 00:48:55', 2)), (u'c6c9', (u'2016-07-04 20:29:19', 3)), (u'218b', (u'2016-07-05 20:54:45', 4))]

所以结构是

[(user_id, (time_stamp, index))]

我需要做的是返回一个列表列表或元组列表。

如果我们以表格格式查看 RDD,这会更容易。假设这是上述 rdd 的更“经典”表格形式。

[(u'7362', (u'2016-06-29 09:58:35', 0)), 
(u'8600', (u'2016-06-29 20:47:27', 1)), 
(u'7362', (u'2016-07-01 00:48:55', 2)), 
(u'c6c9', (u'2016-07-04 20:29:19', 3)), 
(u'8600', (u'2016-07-05 20:54:45', 4))]

我需要首先按用户对这个 RDD 进行分组,所以我最终会得到三个 RDD(每个用户一个,一个用于非重复用户)。

[(u'7362', (u'2016-06-29 09:58:35', 0)), 
(u'7362', (u'2016-07-01 00:48:55', 2))]

[(u'8600', (u'2016-06-29 20:47:27', 1)), 
(u'8600', (u'2016-07-05 20:54:45', 4))]

[(u'c6c9', (u'2016-07-04 20:29:19', 3))]

现在,对于只包含一个 user_id(前两个)的“表”,我需要计算每一行与前一行的时间差,以及每个索引与每个前一个索引的时间差(考虑到这些“表”有很多超过 2 行)。 这将给出如下表格:

[(u'7362', (35, 2)), (u'7362', (21, 1)), (u'7362', (12, 3)), (u'7362', (41, 2)), (u'7362', (46, 2)), (u'7362', (31, 1)), (u'7362', (97, 3)) ...]

对于第一个 user_id,

[(u'8600', (78, 2)), (u'8600', (43, 1)), (u'8600', (99, 3)), (u'8600', (3, 2)), (u'8600', (15, 2)), (u'8600', (42, 1)), (u'8600', (11, 3)) ...]

以此类推,对于格式为[(user_idx, (dt=t2-t1, didx=idx2-idx1))] 的所有用户,dtdidx 是通过从所有行的当前行值中减去前一行值得出的。

最后,当我拥有上述所有表格时,对于每个用户,我想将它们分组到一个嵌套列表中:

[[1, [21, 31, 43, 42]], [2, [35, 41, 46, 78, 3, 15]], [3, [12, 97, 99, 11]]]

所以最终格式是[[didx, [dt1, dt2 ...]],我不再关心 user_ids,只关心所有用户的索引和与同一索引相关的每个时间差异。

我试图尽可能清楚地解释这个问题,但是,就像我说的,我才真正开始使用 Spark,而且我知道这里没有真正的表格。

【问题讨论】:

【参考方案1】:

当你groupBy userId时,这不会导致多个RDD,而是一个RDD [(UserId,list [(time,index)]形式的RDD。所以在这种情况下,我会做groupBy,然后将用户列表处理成格式,然后按照你说的didx分组,最后从RDD收集结果到列表。

# assume each user has more than one event 
# if this ^ assumption is incorrect, you could filter the RDD before processing to get rid of users 
# with only one event
# also, assume timesteps can be subtracted (there are libraries for this)
def process(indexedTimes):
    num_times = len(indexedTimes)
    new_list = []
    for i in range(1,num_times):
        new_list.append((indexedTimes[i][1]-indexedTimes[i-1][1],datediff(indexedTimes[i][0]-indexedTimes[i-1][0])))
    return new_list

data                                                     # RDD[(userId, (timestep, index))]
  .groupByKey                                            # now RDD[(userId, list[(timestep, index)])]
  .flatMap(lambda userList: process(list(userList[1])))  # now RDD[(didx, dt)]
  .groupByKey                                            # now RDD[(didx, list(dt))]
  .collect                                               # get elements in final list instead of RDD

【讨论】:

您好,非常感谢您的回答。一件小事,一个“)”需要在.flatMap 之后插入。我实现了这个并且它运行了一点,但是它抛出了一个:TypeError:'ResultIterable'对象不支持索引。但是我找到了这个答案:***.com/questions/42978738/… 但我不知道如何使其适应 indexedTimes 元素。 啊,是的,我明白你的意思,你可以在函数之前将 ResultIterable 对象类型转换为列表,它应该可以工作。我编辑了答案 这样就差不多解决了。我现在收到一个错误,因为我的时间不能减去。我用datediff(indexedTimes[i][1], indexedTimes[i-1][1]) 替换了函数中的行,但我得到了一个:AttributeError: 'NoneType' object has no attribute '_jvm'.,我确信这是由于diff 函数。 日期在indexedTimes[i][0],而不是indexedTimes[i][1]。换句话说,您尝试在索引上使用 datediff,而不是日期。在函数中,(date, index) 倒转为(difference of index, difference of date) 因为要根据索引的不同进行分组,所以这个值需要在元组中排在第一位。所以尝试将 datediff 移动到indexedTimes[i][0]-indexedTimes[i-1][0],而不是indexedTimes[i][1]-indexedTimes[i-1][1]。我编辑了答案 如果您仍然遇到同样的错误,请查看此答案:***.com/questions/40297403/…

以上是关于使用 Spark 进行 RDD 和元组操作的主要内容,如果未能解决你的问题,请参考以下文章

从 RDD 中的元组中解包项目时出现 Spark 错误

列表和元组

如何对 spark scala RDD 中的元组列表/数组执行转换?

Spark 2.0:如何将元组的 RDD 转换为 DF [重复]

Spark in Python Working with Tuples - 如何在加入两个 RDD 后合并两个元组

Python的集合和元组