没有参数/可迭代的函数的多处理池?

Posted

技术标签:

【中文标题】没有参数/可迭代的函数的多处理池?【英文标题】:Mulitprocessing pool for function with no arguments/iterable? 【发布时间】:2020-04-16 20:45:46 【问题描述】:

我在 GCE 平台上运行 Python 2.7 来进行计算。 GCE 实例启动、安装各种软件包、从存储桶复制 80 Gb 数据并运行带有 nohangup 的“workermaster.py”脚本。 workermaster 在一个无限循环上运行,该循环检查任务队列存储桶中的任务。当任务桶不为空时,它会选择一个随机文件(任务)并将工作传递给计算模块。如果无事可做,workermaster 会休眠几秒钟并再次检查任务列表。 workermaster 会持续运行,直到实例终止(或中断!)。

目前这工作得很好,但我的问题是我的代码只运行具有单个 CPU 的实例。如果我想扩大计算规模,我必须创建许多相同的单 CPU 实例,这意味着创建许多 80 Gb 磁盘并将数据每次传输给它们会产生很大的成本开销,即使计算只是“读取”任何特定计算的一小部分数据。我想通过让我的 workermaster 能够使用多个 CPU 来提高一切效率和成本效益,但是在阅读了许多关于 SO 的教程和其他问题之后,我完全感到困惑。

我想我可以将我的 workermaster 代码的重要部分变成一个函数,然后创建一个使用多处理模块“调用”它的进程池。一旦 workermaster 循环在每个 CPU 上运行,进程就不需要相互交互或以任何方式相互依赖,它们只是碰巧运行在同一个实例上。 workermaster 打印出关于它在计算中的位置的信息,我也很困惑如何将每个进程的“打印”语句分开,但我想这离我现在的位置只有几步之遥!我的问题/困惑是:

1) 我的 workermaster "def" 没有返回任何值,因为它只是启动了一个无限循环,因为每个 web 示例似乎都有 myresult = pool.map(.....) 格式的内容;和 2) 我的 workermaster "def" 不需要任何参数/输入 - 它只是运行,而我在 SO 和 Python Docs 上看到的多处理示例似乎具有可迭代性。

如果很重要,workermaster代码的简化版本是:

# module imports are here
# filepath definitions go here

def workermaster():

    while True:

        tasklist = cloudstoragefunctions.getbucketfiles('<my-task-queue-bucket')

        if tasklist:

            tasknumber = random.randint(2, len(tasklist))
            assignedtask = tasklist[tasknumber]

            print 'Assigned task is now: ' + assignedtask

            subprocess.call('gsutil -q cp gs://<my-task-queue-bucket>/' + assignedtask + ' "' + taskfilepath + assignedtask + '"', shell=True)

            tasktype = assignedtask.split('#')[0]

            if tasktype == 'Calculation':
                currentcalcid = assignedtask.split('#')[1]
                currentfilenumber = assignedtask.split('#')[2].replace('part', '')
                currentstartfile = assignedtask.split('#
                currentendfile = assignedtask.split('#')[4].replace('.csv', '')

                calcmodule.docalc(currentcalcid, currentfilenumber, currentstartfile, currentendfile)

            elif tasktype == 'Analysis':

                #set up and run analysis module, etc.                   

            print '   Operation completed!'

            os.remove(taskfilepath + assignedtask)

        else:

            print 'There are no tasks to be processed.  Going to sleep...'
            time.sleep(30)

我尝试使用多处理模块多次“调用”该函数。我想我需要使用“池”方法,所以我尝试了这个:

import multiprocessing

if __name__ == "__main__":

    p = multiprocessing.Pool()
    pool_output = p.map(workermaster, [])

我对文档的理解是,__name__ 行仅作为在 Windows 中进行多处理的一种解决方法(我正在为开发而做,但 GCE 在 Linux 上)。 p = multiprocessing.Pool() 行正在创建一个工作池,该池等于系统 CPU 的数量,因为未指定任何参数。如果 CPU 的数量为 1,那么我希望代码的行为与我尝试使用多处理之前的行为相同。最后一行是我不明白的。我认为它告诉池中的每个处理器“目标”(要运行的东西)是 workermaster。从文档来看,似乎有一个可迭代的强制性参数,但我真的不明白在我的情况下这是什么,因为 workermaster 不接受任何参数。我试过给它传递一个空列表、空字符串、空括号(元组?),但它什么也没做。

请问有人可以帮我吗?关于使用多处理有很多讨论,这个线程Mulitprocess Pools with different functions 和这个python code with mulitprocessing only spawns one process each time 似乎与我正在做的事情很接近,但仍然有可迭代作为参数。如果我遗漏了任何重要的内容,请提出建议,我会修改我的帖子 - 感谢任何可以提供帮助的人!

【问题讨论】:

pool 如果您想使用不同的参数运行相同的函数,则很有用。如果您只想运行一次函数,请使用 normall Process()。如果您想运行相同的功能 2 次,那么您可以手动创建 2 个Process()。如果您想使用 Pool() 运行 2 次,则添加带有 2 个参数的列表(即使您不需要它),因为它是 Pool() 运行 2 次的信息。但是,如果您运行 2 次函数,它适用于同一个文件夹,那么您可能会发生冲突 - 您将运行 2 次相同的任务。 如果你想使用 Pool 和 map,你需要重新定义你的函数以使用至少一个参数(你可以丢弃它)。 ***.com/questions/27689834/… 谢谢@furas 和@rajendra。我向 workerfunction 添加了一个参数,因此它现在是 def workermaster(x): 我还使用 x 作为变量来区分 CPU 线程,方法是将打印语句修改为 print 'CPU-' + str(x) + ': Status is now....' 等。我注意到使用pool.map 方法是我现在无法使用 CTRL+C 终止笔记本电脑上的进程。我必须关闭命令提示符并开始一个新的命令提示符 - 有什么特别的原因/解决方法吗?如果有人想写他们的回复作为答案,我很乐意接受。 谷歌python multiprocessing ctrl+c给我:Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python 【参考方案1】:

Pool() 如果您想使用不同的参数运行相同的函数,这很有用。

如果您只想运行一次函数,请使用普通的Process()。 如果你想运行相同的函数 2 次,那么你可以手动创建 2 个Process()

如果您想使用 Pool() 运行函数 2 次,则添加带有 2 个参数的列表(即使您不需要参数),因为它是 Pool() 运行 2 次的信息。

但是,如果您在同一个文件夹中运行 2 次函数,那么它可能会运行 2 次相同的任务。如果您将运行 5 次,那么它可能会运行 5 次相同的任务。不知道有没有必要。


至于Ctrl+C,我在 *** 上找到了Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python,但我不知道它是否能解决您的问题。

【讨论】:

再次感谢您,并为您没有在 Google 上搜索 CTRL+C 查询表示歉意 - 我不应该这么懒惰!我无法使用 Process 方法让它工作,但池和映射一个运行良好。 CTRL+C 看起来需要一些阅读和进一步的工作,但希望我能很快让它工作。谢谢!

以上是关于没有参数/可迭代的函数的多处理池?的主要内容,如果未能解决你的问题,请参考以下文章

多处理:将数组列表传递(和迭代)到池映射

如何将仅关键字参数应用于多处理池中的函数?

map,filter

多处理:池:直接迭代器或使用变量来存储迭代器

python 11 函数名 迭代器

如何使用多处理池读取文件