使用 Spark 进行 RDD 和元组操作
Posted
技术标签:
【中文标题】使用 Spark 进行 RDD 和元组操作【英文标题】:RDD and tuple manipulation with Spark 【发布时间】:2018-08-09 09:45:43 【问题描述】:我刚开始使用 Spark,但遇到一个问题,我真的不知道如何解决。
我的输入是如下形式的 RDD:
[(u'7362', (u'2016-06-29 09:58:35', 0)), (u'8600', (u'2016-06-29 20:47:27', 1)), (u'f9f8', (u'2016-07-01 00:48:55', 2)), (u'c6c9', (u'2016-07-04 20:29:19', 3)), (u'218b', (u'2016-07-05 20:54:45', 4))]
所以结构是
[(user_id, (time_stamp, index))]
我需要做的是返回一个列表列表或元组列表。
如果我们以表格格式查看 RDD,这会更容易。假设这是上述 rdd 的更“经典”表格形式。
[(u'7362', (u'2016-06-29 09:58:35', 0)),
(u'8600', (u'2016-06-29 20:47:27', 1)),
(u'7362', (u'2016-07-01 00:48:55', 2)),
(u'c6c9', (u'2016-07-04 20:29:19', 3)),
(u'8600', (u'2016-07-05 20:54:45', 4))]
我需要首先按用户对这个 RDD 进行分组,所以我最终会得到三个 RDD(每个用户一个,一个用于非重复用户)。
[(u'7362', (u'2016-06-29 09:58:35', 0)),
(u'7362', (u'2016-07-01 00:48:55', 2))]
[(u'8600', (u'2016-06-29 20:47:27', 1)),
(u'8600', (u'2016-07-05 20:54:45', 4))]
[(u'c6c9', (u'2016-07-04 20:29:19', 3))]
现在,对于只包含一个 user_id(前两个)的“表”,我需要计算每一行与前一行的时间差,以及每个索引与每个前一个索引的时间差(考虑到这些“表”有很多超过 2 行)。 这将给出如下表格:
[(u'7362', (35, 2)), (u'7362', (21, 1)), (u'7362', (12, 3)), (u'7362', (41, 2)), (u'7362', (46, 2)), (u'7362', (31, 1)), (u'7362', (97, 3)) ...]
对于第一个 user_id,
[(u'8600', (78, 2)), (u'8600', (43, 1)), (u'8600', (99, 3)), (u'8600', (3, 2)), (u'8600', (15, 2)), (u'8600', (42, 1)), (u'8600', (11, 3)) ...]
以此类推,对于格式为[(user_idx, (dt=t2-t1, didx=idx2-idx1))]
的所有用户,dt
和didx
是通过从所有行的当前行值中减去前一行值得出的。
最后,当我拥有上述所有表格时,对于每个用户,我想将它们分组到一个嵌套列表中:
[[1, [21, 31, 43, 42]], [2, [35, 41, 46, 78, 3, 15]], [3, [12, 97, 99, 11]]]
所以最终格式是[[didx, [dt1, dt2 ...]]
,我不再关心 user_ids,只关心所有用户的索引和与同一索引相关的每个时间差异。
我试图尽可能清楚地解释这个问题,但是,就像我说的,我才真正开始使用 Spark,而且我知道这里没有真正的表格。
【问题讨论】:
【参考方案1】:当你groupBy userId时,这不会导致多个RDD,而是一个RDD [(UserId,list [(time,index)]形式的RDD。所以在这种情况下,我会做groupBy,然后将用户列表处理成格式,然后按照你说的didx分组,最后从RDD收集结果到列表。
# assume each user has more than one event
# if this ^ assumption is incorrect, you could filter the RDD before processing to get rid of users
# with only one event
# also, assume timesteps can be subtracted (there are libraries for this)
def process(indexedTimes):
num_times = len(indexedTimes)
new_list = []
for i in range(1,num_times):
new_list.append((indexedTimes[i][1]-indexedTimes[i-1][1],datediff(indexedTimes[i][0]-indexedTimes[i-1][0])))
return new_list
data # RDD[(userId, (timestep, index))]
.groupByKey # now RDD[(userId, list[(timestep, index)])]
.flatMap(lambda userList: process(list(userList[1]))) # now RDD[(didx, dt)]
.groupByKey # now RDD[(didx, list(dt))]
.collect # get elements in final list instead of RDD
【讨论】:
您好,非常感谢您的回答。一件小事,一个“)”需要在.flatMap 之后插入。我实现了这个并且它运行了一点,但是它抛出了一个:TypeError:'ResultIterable'对象不支持索引。但是我找到了这个答案:***.com/questions/42978738/… 但我不知道如何使其适应 indexedTimes 元素。 啊,是的,我明白你的意思,你可以在函数之前将 ResultIterable 对象类型转换为列表,它应该可以工作。我编辑了答案 这样就差不多解决了。我现在收到一个错误,因为我的时间不能减去。我用datediff(indexedTimes[i][1], indexedTimes[i-1][1])
替换了函数中的行,但我得到了一个:AttributeError: 'NoneType' object has no attribute '_jvm'.
,我确信这是由于diff
函数。
日期在indexedTimes[i][0]
,而不是indexedTimes[i][1]
。换句话说,您尝试在索引上使用 datediff,而不是日期。在函数中,(date, index) 倒转为(difference of index, difference of date) 因为要根据索引的不同进行分组,所以这个值需要在元组中排在第一位。所以尝试将 datediff 移动到indexedTimes[i][0]-indexedTimes[i-1][0]
,而不是indexedTimes[i][1]-indexedTimes[i-1][1]
。我编辑了答案
如果您仍然遇到同样的错误,请查看此答案:***.com/questions/40297403/…以上是关于使用 Spark 进行 RDD 和元组操作的主要内容,如果未能解决你的问题,请参考以下文章
如何对 spark scala RDD 中的元组列表/数组执行转换?
Spark 2.0:如何将元组的 RDD 转换为 DF [重复]