Spark join 速度呈指数级增长
Posted
技术标签:
【中文标题】Spark join 速度呈指数级增长【英文标题】:Spark join exponentially slow 【发布时间】:2014-10-07 14:47:46 【问题描述】:我正在尝试加入两个 Spark RDD。我有一个与类别相关联的事务日志。我已将我的事务 RDD 格式化为以类别 ID 作为键。
transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']),
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']),
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]
categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]
事务日志大约 20 GB(3.5 亿行)。 类别列表小于 1KB。
当我跑步时
transactions_cat.join(categories).count()
Spark 开始变得非常缓慢。我有一个有 643 个任务的阶段。前 10 个任务大约需要 1 分钟。然后每个任务越来越慢(第 60 个任务大约需要 15 分钟)。我不确定出了什么问题。
请查看这些屏幕截图以获得更好的想法。
我正在运行 Spark 1.1.0,有 4 名工作人员使用 python shell,总内存为 50 GB。 仅计算事务 RDD 非常快(30 分钟)
【问题讨论】:
【参考方案1】:问题可能是 Spark 没有注意到您有一个简单的连接问题案例。当你加入的两个RDD
s 之一太小时,最好不要成为RDD
。然后,您可以推出自己的hash join 实现,这实际上比听起来要简单得多。基本上,您需要:
collect()
将您的类别列表从RDD
中提取出来——由此产生的通信将很容易收回成本(或者,如果可能,首先不要将其设为RDD
)
将其转换为一个哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的)
对于大 RDD
中的每一对,在哈希表中查找键并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)
我有一个implementation in Scala -- 随时询问有关翻译的问题,但应该很容易。
另一个有趣的可能性是尝试使用Spark SQL。我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已经实现了。
【讨论】:
嗨,我面临着完全相同的问题,尽管是数据帧。七个 10x2 数据帧的简单连接,然后在结果上运行 count(),使 Spark 失去了控制,包含 1000 多个任务和 14 个阶段。有什么方法可以在 Spark 之前的手动加入中修复/优化这个问题?以上是关于Spark join 速度呈指数级增长的主要内容,如果未能解决你的问题,请参考以下文章
docker devicemapper 数据文件大小呈指数增长并消耗我的主机磁盘 90%