Spark join 速度呈指数级增长

Posted

技术标签:

【中文标题】Spark join 速度呈指数级增长【英文标题】:Spark join exponentially slow 【发布时间】:2014-10-07 14:47:46 【问题描述】:

我正在尝试加入两个 Spark RDD。我有一个与类别相关联的事务日志。我已将我的事务 RDD 格式化为以类别 ID 作为键。

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

事务日志大约 20 GB(3.5 亿行)。 类别列表小于 1KB。

当我跑步时

transactions_cat.join(categories).count()

Spark 开始变得非常缓慢。我有一个有 643 个任务的阶段。前 10 个任务大约需要 1 分钟。然后每个任务越来越慢(第 60 个任务大约需要 15 分钟)。我不确定出了什么问题。

请查看这些屏幕截图以获得更好的想法。

我正在运行 Spark 1.1.0,有 4 名工作人员使用 python shell,总内存为 50 GB。 仅计算事务 RDD 非常快(30 分钟)

【问题讨论】:

【参考方案1】:

问题可能是 Spark 没有注意到您有一个简单的连接问题案例。当你加入的两个RDDs 之一太小时,最好不要成为RDD。然后,您可以推出自己的hash join 实现,这实际上比听起来要简单得多。基本上,您需要:

使用collect() 将您的类别列表从RDD 中提取出来——由此产生的通信将很容易收回成本(或者,如果可能,首先不要将其设为RDD) 将其转换为一个哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的) 对于大 RDD 中的每一对,在哈希表中查找键并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)

我有一个implementation in Scala -- 随时询问有关翻译的问题,但应该很容易。

另一个有趣的可能性是尝试使用Spark SQL。我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已经实现了。

【讨论】:

嗨,我面临着完全相同的问题,尽管是数据帧。七个 10x2 数据帧的简单连接,然后在结果上运行 count(),使 Spark 失去了控制,包含 1000 多个任务和 14 个阶段。有什么方法可以在 Spark 之前的手动加入中修复/优化这个问题?

以上是关于Spark join 速度呈指数级增长的主要内容,如果未能解决你的问题,请参考以下文章

使用 std::vector 的指数内存消耗增长

如何计算每个呈指数增长的数字的总和?

Redshift 表在生产集群中占用的磁盘空间呈指数增长

docker devicemapper 数据文件大小呈指数增长并消耗我的主机磁盘 90%

SciKit One-class SVM 分类器训练时间随着训练数据的大小呈指数增长

斐波那契递归的优化及指数计算的优化