一旦内核空闲,如何通过添加进程在python中执行批量计算?

Posted

技术标签:

【中文标题】一旦内核空闲,如何通过添加进程在python中执行批量计算?【英文标题】:How to perform batch computation in python by adding processes as soon as cores become free? 【发布时间】:2018-09-20 07:35:12 【问题描述】:

Bash 有一个函数“wait -n”,它可以用一种相对简单的方式来停止子进程的后续执行,直到有一定数量的处理器内核可用。例如。我可以做到以下几点,

for IJOB in IJOBRANGE;
do

    ./func.x $IJOB

    # checking the number of background processes
    # and halting the execution accordingly

    bground=( $(jobs -p) );

    if (( $#bground[@] >= CORES )); then
        wait -n
    fi

done || exit 1

这个 sn-p 可以用不同的参数批量执行任意 C 进程“func.x”,并始终保持固定数量的子进程并行实例,设置为值“CORES”。

我想知道是否可以使用 python 脚本完成类似的操作 python子进程(或函数)。目前,我定义了一个 python 函数,设置了一个一维参数数组,并使用 python 多处理模块中的 Pool 例程在参数数组上并行计算函数。池函数对我的函数执行一定数量(以下示例中的 CPU 核心数)的评估,并等待所有衍生进程的实例结束,然后再移动到下一批。

import multiprocessing as mp

def func(x):

    # some computation with x

def main(j):

    # setting the parameter array
    xarray = range(j)

    pool = mp.Pool()
    pool.map(func,xarray)

我想知道是否可以修改这个 sn-p 以便始终对我的子例程执行固定数量的并行计算,即在一个子进程完成后立即添加另一个进程。这里的所有“func”进程都应该是独立的,执行顺序也不重要。我是 python 方式的新手,如果有一些有用的观点真的很棒。

【问题讨论】:

你能澄清一下吗?我真的不明白你想要做什么。你的代码不是已经完成了你想要的(即“执行固定数量的并行计算”)吗? 如果信息不清楚,我很抱歉。在工作负载不对称的情况下,这可能会变得稍微复杂一些。假设我有 40 个内核,因此多处理池将我的参数数组拼接成 40 个批次。但是对于每组,它将等到所有这 40 个函数调用都完成。理想情况下,我希望有一个解决方案,例如,一旦当前批次中的 2 个完成后,将添加 2 个进一步的进程。那清楚吗?非常感谢您的回复。 那更清楚了,谢谢!我已经玩过这个模块了,似乎它不会等待所有任务完成来开始一个新的。我将发布一些示例代码作为答案,因为在 cmets 中很难做到。 【参考方案1】:

根据我们在 cmets 中的讨论,这里有一些改编自您的测试代码,显示 Pools 在将新任务分配给可用工作人员之前不会等待所有并行任务完成:

import multiprocessing as mp
from time import sleep, time


def func(x):
    """sleeps for x seconds"""
    name = mp.current_process().name
    print(" : sleep ".format(time(), name, x))
    sleep(x)
    print(" : done sleeping".format(time(), name))


def main():

    # A pool of two processes, for the sake of simplicity
    pool = mp.Pool(processes=2)
    # Here's how that works out visually:
    #
    #    0s        1s       2s        3s
    # P1 [sleep(1)][     sleep(2)     ]
    # P2 [     sleep(2)     ][sleep(1)]
    sleeps = [1, 2, 2, 1]
    pool.map(func, sleeps)


if __name__ == "__main__":
    main()

运行此代码给出(为清楚起见简化了时间戳):

$ python3 mp.py 
0s: ForkPoolWorker-1: sleep 1
0s: ForkPoolWorker-2: sleep 2
1s: ForkPoolWorker-1: done sleeping
1s: ForkPoolWorker-1: sleep 2
2s: ForkPoolWorker-2: done sleeping
2s: ForkPoolWorker-2: sleep 1
3s: ForkPoolWorker-1: done sleeping
3s: ForkPoolWorker-2: done sleeping

我们可以看到第一个进程在开始第二个任务之前没有等待第二个进程完成它的第一个任务。

所以我想这应该回答你提出的观点,希望我已经清楚地理解了你。

【讨论】:

真是个好主意,我自己测试了一下(但没有你那么好)并得出了同样的结论。我的测试包括使用不同的块大小对 imap 进行计时。不过你的更重要。 谢谢,我很感激。我也考虑过块大小,但似乎默认值是合理的,并且与参数没有任何关系。这就是我喜欢 Python 的地方:简单明了 :) 非常感谢这样一个干净的例子,我不能再欣赏它了。我注意到我原始计算中的一种行为,有时核心使用量下降到批次计数的一小部分,剩余大量批次。我很好奇为什么会发生这种情况,而我对该模块的有限了解导致我误解了它的范围。再次感谢您澄清困惑。

以上是关于一旦内核空闲,如何通过添加进程在python中执行批量计算?的主要内容,如果未能解决你的问题,请参考以下文章

js面试之EventLoop

如何使CPU占用率为50%

cpu利用率

python之psutil模块获取系统信息

如何在python中选择一个空闲端口号? [复制]

Linux的进程控制