使用 Concurrent.Futures.ProcessPoolExecutor 运行同时和独立的 ABAQUS 模型

Posted

技术标签:

【中文标题】使用 Concurrent.Futures.ProcessPoolExecutor 运行同时和独立的 ABAQUS 模型【英文标题】:Using Concurrent.Futures.ProcessPoolExecutor to run simultaneous & independents ABAQUS models 【发布时间】:2016-12-04 16:06:27 【问题描述】:

我希望运行总共 nAnalysis=25 个 Abaqus 模型,每个模型使用 X 个核心,并且我可以同时运行 nParallelLoops= 5 个这些模型。如果当前 5 个分析中的一个完成,则应开始另一个分析,直到所有 nAnalysis 完成。

我根据12中发布的解决方案实现了以下代码。但是,我遗漏了一些东西,因为所有 nAnalysis 都尝试从“一次”开始,代码死锁并且没有分析完成,因为许多人可能想要使用相同的核心比已经开始的分析正在使用。

    Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):    
    import subprocess
    import os

    inpFile,path,jobVars = args

    prcStr1 = (path+'/runJob.sh')

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)

def safeABQrun(*args):
    import os

    try:
        runABQfile(*args)
    except Exception as e:
        print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis))  # 5Nodes
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

到目前为止,我能够运行的唯一方法是,如果我修改 errFunction 以当时正好使用 5 次分析,如下所示。但是,这种方法有时会导致其中一个分析花费的时间比每组中的其他 4 个(每个ProcessPoolExecutor 调用)要长得多,因此尽管有资源(核心)可用,下一组 5 个也不会开始。最终,这会导致完成所有 25 个模型的时间更长。

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait    

    # Group 1
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 2
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 3
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 4
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

我尝试使用as_completed 函数,但它似乎也不起作用。

请您帮忙找出正确的并行化,以便我可以运行 nAnalysis,始终使用 nParallelLoops同时运行? 感谢您的帮助。 我正在使用 Python 2.7

最好的, 大卫·P。


2016 年 7 月 30 日更新

我在 safeABQrun 中引入了一个循环,它管理 5 个不同的“队列”。该循环是必要的,以避免分析试图在一个节点中运行而另一个节点仍在运行的情况。在开始任何实际分析之前,分析已预先配置为在请求的节点之一中运行。

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

【问题讨论】:

【参考方案1】:

对我来说看起来不错,但我无法按原样运行您的代码。尝试一些非常简单的东西,然后添加东西直到出现“问题”如何?例如,以下是否显示了您想要的行为类型?它在我的机器上运行,但我运行的是 Python 3.5.2。你说你正在运行 2.7,但concurrent.futures 在 Python 2 中不存在 - 所以如果你使用 2.7,你必须运行某人的库的反向移植,也许问题就出在于此。尝试以下应该有助于回答是否是这种情况:

from concurrent.futures import ProcessPoolExecutor, wait, as_completed

def worker(i):
    from time import sleep
    from random import randrange
    s = randrange(1, 10)
    print("%d started and sleeping for %d" % (i, s))
    sleep(s)

if __name__ == "__main__":
    nAnalysis = 25
    nParallelLoops = 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
        for f in as_completed(futures):
            print("got %d" % futures[f])

典型输出:

0 started and sleeping for 4
1 started and sleeping for 1
2 started and sleeping for 1
3 started and sleeping for 6
4 started and sleeping for 5
5 started and sleeping for 9
got 1
6 started and sleeping for 5
got 2
7 started and sleeping for 6
got 0
8 started and sleeping for 6
got 4
9 started and sleeping for 8
got 6
10 started and sleeping for 9
got 3
11 started and sleeping for 6
got 7
12 started and sleeping for 9
got 5
...

【讨论】:

【参考方案2】:

我在 safeABQrun 中引入了一个循环,它管理 5 个不同的“队列”。循环是必要的,以避免分析试图在一个节点中运行而另一个节点仍在运行的情况。在开始任何实际分析之前,分析已预先配置为在请求的节点之一中运行。

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

【讨论】:

以上是关于使用 Concurrent.Futures.ProcessPoolExecutor 运行同时和独立的 ABAQUS 模型的主要内容,如果未能解决你的问题,请参考以下文章

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份

Kettle java脚本组件的使用说明(简单使用升级使用)