spark中的嵌套for循环并行化

Posted

技术标签:

【中文标题】spark中的嵌套for循环并行化【英文标题】:Nested for-loop parallelization in spark 【发布时间】:2019-03-31 05:39:41 【问题描述】:

我正在尝试并行化 spark 中的现有算法(以可扩展的方式)。出于问题的目的,我已对其进行了简化,但它看起来像这样:

for p in all_p:
    all_q = calculate1(p)

    results_for_p = []
    for q in all_q:
        results_for_p.append(calculate2(q))

    save_results(results_for_p)

基本上,我已经将 for 循环嵌套了 2 个我想并行运行的长时间运行的函数。但是,嵌套函数calculate2 的参数的大小取决于每个p

我的尝试是扁平化输入,以便 calculate2 一起在 all_q 和 all_p 上运行:

rdd = sc.parallelize(all_p)
all_q_each_p = rdd.map(calculate1).collect()

# flatten output to something we can parallelize:
all_q_all_p = []
for all_q in all_q_each_p:
    all_q_all_p.append(all_q)

rdd = sc.parallelize(all_q_all_p)
res = rdd.map(calculate2).collect()

# How to do this?? 
collect_and_save_all_results(res)

如何以一种可以很好扩展的方式编写这个?

【问题讨论】:

我不确定我们是否可以谈论嵌套并行化。 @thebluephantom 我已将标题更新为“嵌套 for 循环并行化” 【参考方案1】:

这正是flatMap 解决的问题类型。 flatMap 默认调整 rdd 的大小。

代码变得简单多了:

rdd = sc.parallelize(all_p)

rdd.flatMap(calculate1).map(
    lambda args: calculate2(*args)
).collect()

【讨论】:

以上是关于spark中的嵌套for循环并行化的主要内容,如果未能解决你的问题,请参考以下文章

在 OpenMP 中并行化嵌套循环并使用更多线程执行内部循环

使用 OpenMP 在 C、C++ 中并行化嵌套 for 循环的几种方法之间的区别

在 Python 中并行化四个嵌套循环

如何使用CUDA并行化嵌套for循环以在2D数组上执行计算

开放式加速器 | Fortran 90:并行化嵌套 DO 循环的最佳方法是啥?

如何优化并行嵌套循环?