在火花中,如何同时运行几个“收集”

Posted

技术标签:

【中文标题】在火花中,如何同时运行几个“收集”【英文标题】:In spark, how to run several `collect` concurrently 【发布时间】:2016-11-16 23:52:50 【问题描述】:

我是 Spark 的新手,我不确定我是否理解它很好地分配作业的方式。

我有以下代码:

c1 = dict(smallrdd1.collect())
bigrdd1 = bigrdd1.filter(lambda val: c1[val])

c2 = dict(smallrdd2.collect())
bigrdd2 = bigrdd2.filter(lambda val: c2[val])

如果我理解得很好,这两个collect 将在主作业上运行,而不是同时运行,这会减慢整个过程。如何让 spark 在单独的作业中、在单独的节点上执行收集和过滤?

编辑

我认为我的问题不是很清楚。我会尽量让它更具体。

我想要实现的是对面向列的数据进行有效的星图连接。也就是说,我有几个维度表,每个维度表都有几百万个键值。当存储为原始 python dict 或 scala Map 时,它们每个都是几百 Mb。这足够小,可以在每个节点上单独放入内存中,但它们加在一起构成了几个 Gb,并且无法放入内存中。

另一方面,对于这些小维度列中的每一个,我都有一个大的事实列,它有几十亿行,并且肯定不适合内存。

这个想法是管理集群内的数据位置,以便每个大表位于不同的节点上。然后启动几个任务,每个任务负责一个join。然后从磁盘加载维度表并同时创建哈希图,然后将哈希图“广播”到那些需要它的任务。然后,将大事实列与维度表连接起来。

【问题讨论】:

两个收集不会同时运行,它是顺序的。主要是当你收集时,它会将所有元素收集到驱动程序,当你使用任何转换或操作(如过滤器、映射等)时......它将在单独的节点上运行。 是的,我明白了。但就我而言,我只需要在将执行过滤器的执行者上拥有这些dict。我不需要它们在驱动程序上。有没有办法做到这一点? 我不确定那个字典是什么...但是你为什么要先收集然后过滤?当您使用过滤器时,它将在执行程序上运行。 是的,过滤器当然会在执行器上运行。我正在尝试实现并行地图侧连接。这些 dict 是我的小表的哈希表。 【参考方案1】:

因此,collect 以及 takefirst 通常用于测试和调试。正如@Shankar 所写,collect 会将所有数据拉到驱动程序中,这对于测试期间使用的小数据集来说很好,但在处理大量数据集时就不行了,因为它会用 OOM 杀死你的驱动程序。

现在,您说要加入,所以只需使用join

val joined = bigrdd.join(smallrdd, bigrdd.col("id") === smallrdd.col("id"))

(这是 Scala 语法,但我相信你明白了)

现在数据已加入您的工作人员,您可以继续使用过滤器、地图和其他转换 :)

【讨论】:

是的,当然,我可以使用join。但问题是 join 是一项代价高昂的操作,当smallrdd 足够小时,就没有必要这样做了。 你建议的是 reduce-side 加入,而我想做一个 map-side 加入。如果我理解正确,spark 不会用我的小 rdd 自动构建一个 hashmap,但如果我使用join,它会在后台执行代价高昂的 shuffle 操作。我说的对吗? 看,我们正在努力帮助您,但您显然似乎最了解,所以我不确定您为什么在 SO 上发帖。我可以告诉你,我使用 Spark 已有 2 年了,如果你查看我的个人资料,你会发现我在 SO 上 96% 的声誉来自于回答 Spark 问题,所以我对 Spark 编程有所了解。要并行化您的查询,您需要使用join。如果您的 smallrdd 真的很小,为什么还要将其加载为 rdd?只需使用普通数组并广播即可。 如果我看起来很激进,我真的很抱歉,这不是目标!实际上,每个 smallrdd 都足够小以适合内存,但 smallrdd1 + smallrdd2 可能不适合。只广播一个,然后另一个的问题是它是顺序的。我想知道是否有办法同时做到这一点。 smallrdds 必须适合要广播的常规数组。如果他们不能,那么你的 smallrdds 不是 那么 小,你需要做一个joinjoin 确实会导致数据混洗,但 Spark 就是为此而构建的,而且它真的很擅长。当然,你应该警惕你正在洗牌的数据量,并确保你不会多次洗牌相同的数据,但大多数工作都需要洗牌。你的工作不是避免洗牌,而是避免不必要的洗牌。【参考方案2】:

所以,我认为对我的问题的简短回答是不可能

从我得到的答案来看,我认为这根本是不可能用 spark 做的。我们唯一的可能是在驱动程序中创建所有的hasmap,或者使用传统的spark join

驱动程序内部

顺序加载小维表,然后将它们广播给所有执行器(这是顺序的,并且会导致 Gbs 的无用数据移动):

c1 = sc.broadcast(smallrdd1.collectAsMap())
bigrdd1 = bigrdd1.filter(lambda val: c1.value[val])

c2 = sc.broadcast(smallrdd2.collectAsMap())
bigrdd2 = bigrdd2.filter(lambda val: c1.value[val])

关于执行者

不在驱动程序上计算任何东西,但计算连接的速度较慢,因为它涉及对大型数据集的 shuffle 操作。

bigrdd1 = bigrdd1 \
              .map(lambda (bigk,bigv):(bigv,bigk)) \
              .join(smallrdd1) \
              .map(lambda (bigv,(bigk,smallv)):(bigk,bigv))
bigrdd2 = bigrdd2 \
              .map(lambda (bigk,bigv):(bigv,bigk)) \
              .join(smallrdd2) \
              .map(lambda (bigv,(bigk,smallv)):(bigk,bigv))

【讨论】:

恐怕你不会在这里得到我的支持。你想要做的,根本不是你在 Spark 中做事的方式,而是你没有接受这一点并接受 Spark 方式,你似乎坚持收集 rdds 作为首选解决方案的想法......我非常不同意——不知道还能说什么。 如果我看起来很固执,或者让你浪费了时间,我真的很抱歉。但我真的想在这里实现一个算法,而不仅仅是为了完成任务。 对于那些感兴趣的人,我仍然将工作 Spark 代码放在我的答案中。我还能做什么?

以上是关于在火花中,如何同时运行几个“收集”的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)

NoClassDefFoundError:org/apache/hadoop/fs/StreamCapabilities,同时用火花读取s3数据

JVM垃圾收集器和收集器的选择策略

查询Spark同时加载的hive表时如何避免错误

如何在 lighttd 服务器中同时运行多个 php 版本?

如何实现javascript多线程同时运行?