spark中的嵌套for循环并行化
Posted
技术标签:
【中文标题】spark中的嵌套for循环并行化【英文标题】:Nested for-loop parallelization in spark 【发布时间】:2019-03-31 05:39:41 【问题描述】:我正在尝试并行化 spark 中的现有算法(以可扩展的方式)。出于问题的目的,我已对其进行了简化,但它看起来像这样:
for p in all_p:
all_q = calculate1(p)
results_for_p = []
for q in all_q:
results_for_p.append(calculate2(q))
save_results(results_for_p)
基本上,我已经将 for 循环嵌套了 2 个我想并行运行的长时间运行的函数。但是,嵌套函数calculate2
的参数的大小取决于每个p
。
我的尝试是扁平化输入,以便 calculate2 一起在 all_q 和 all_p 上运行:
rdd = sc.parallelize(all_p)
all_q_each_p = rdd.map(calculate1).collect()
# flatten output to something we can parallelize:
all_q_all_p = []
for all_q in all_q_each_p:
all_q_all_p.append(all_q)
rdd = sc.parallelize(all_q_all_p)
res = rdd.map(calculate2).collect()
# How to do this??
collect_and_save_all_results(res)
如何以一种可以很好扩展的方式编写这个?
【问题讨论】:
我不确定我们是否可以谈论嵌套并行化。 @thebluephantom 我已将标题更新为“嵌套 for 循环并行化” 【参考方案1】:这正是flatMap
解决的问题类型。 flatMap
默认调整 rdd 的大小。
代码变得简单多了:
rdd = sc.parallelize(all_p)
rdd.flatMap(calculate1).map(
lambda args: calculate2(*args)
).collect()
【讨论】:
以上是关于spark中的嵌套for循环并行化的主要内容,如果未能解决你的问题,请参考以下文章
在 OpenMP 中并行化嵌套循环并使用更多线程执行内部循环
使用 OpenMP 在 C、C++ 中并行化嵌套 for 循环的几种方法之间的区别