如何实现并行,以这样的方式延迟,当输出低于阈值时并行化 for 循环停止?

Posted

技术标签:

【中文标题】如何实现并行,以这样的方式延迟,当输出低于阈值时并行化 for 循环停止?【英文标题】:How to implement parallel, delayed in such a way that the parallelized for loop stops when output goes below a threshold? 【发布时间】:2020-03-20 10:11:29 【问题描述】:

假设我有以下代码:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

我只想计算输出直到输出>给定数字(可以假设输出元素随着x的增加而单调减少)然后停止(不计算x的所有值然后排序,这对我来说效率低下目的)。有没有办法使用并行、延迟或任何其他多处理来做到这一点?

【问题讨论】:

你也可以使用 numpy。我添加了几个数字。演示函数中的选择[z>.1]应该在main函数中进行,这样代码效率更高。 我知道这会很混乱,但我会创建一个列表,将其传递给函数,然后函数会将结果附加到该列表中。然后在外面我会检查列表是否包含比这个更高的数字,然后以某种方式终止线程。现在我考虑到这一点,可能有更聪明的方法可以做到这一点,比如队列 【参考方案1】:

没有指定output > a given number,所以我只是编了一个。测试后我不得不扭转 正常运行的条件output < a given number

我会使用一个池,使用回调函数启动进程以检查停止条件,然后终止池 准备好时。但这会导致竞争条件,这将允许从正在运行的进程中省略结果 不允许完成。我认为这种方法对您的代码的修改最少,并且非常易于阅读。这 不保证列表的顺序。

优点:开销很小 缺点:可能会丢失结果。

方法一)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

此方法有更多开销,但允许已开始的进程完成。 方法二)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

方法 3) 优点:不会遗漏任何结果 缺点:这超出了您通常的做法。

采取方法一并添加

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

那就换stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)

【讨论】:

【参考方案2】:

我会使用 Dask 来并行执行,特别是 futures 接口用于在结果完成时实时反馈结果。完成后,您可以取消剩余的正在运行的期货,租用不需要的期货以异步完成或关闭集群。

from dask.distributed import Client, as_completed
client = Client()  # defaults to ncores workers, one thread each
y, xmin, xmax, dx = 2.,1.,30.,.1

def func(x, y):
    return x, y/x
x = arange(xmin,xmax,dx)
outx = []
output = []
futs = [client.submit(func, val, y) for val in x]
for future in as_completed(futs):
    outs = future.result()
    outx.append(outs[0])
    output.append(outs[1])
    if outs[1] < 0.1:
        break

注意事项: - 我假设你的意思是“小于”,否则第一个值已经通过(y / xmin &gt; 0.1) - 如果您想在结果准备就绪时获取结果,则不能保证输出按照输入顺序排列,但是通过如此快速的计算,也许它们总是如此(这就是为什么我也让 func 返回输入值的原因) - 如果你停止计算,输出将比完整的输入集短,所以我不太确定你想打印什么。

【讨论】:

以上是关于如何实现并行,以这样的方式延迟,当输出低于阈值时并行化 for 循环停止?的主要内容,如果未能解决你的问题,请参考以下文章

如何实现低于1ms的延迟?

串行输入输出和并行输入输出的区别解析

Weblogic 9中如何实现并行处理

如何实现1080P延迟低于500ms的实时超清直播传输技术

如何以与 np.where 相同的方式使用 Tensorflow.where?

如何删除每组计数低于阈值的记录?