python concurrent.futures 使用子进程,运行几个python脚本

Posted

技术标签:

【中文标题】python concurrent.futures 使用子进程,运行几个python脚本【英文标题】:Python concurrent.futures using subprocess, running several python script 【发布时间】:2018-08-22 07:35:44 【问题描述】:

我想使用 concurrent.futures 同时运行多个 python 脚本。 我的代码的串行版本会在文件夹中查找特定的 python 文件并执行它。

import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf

FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern   = "Read.py"

for dir,_,_ in os.walk(start_dir):
    FileList.extend(glob(os.path.join(dir,pattern))) ;

FileList

i=0
for file in FileList:
    dir=os.path.dirname((file))
    dirname1 = os.path.basename(dir) 
    print(dirname1)
    i=i+1
    Str='python '+ file
    print(Str)
    completed_process = subprocess.run(Str)`

对于我的代码的并行版本:

    def Python_callback(future):
    print(future.run_type, future.jid)
    return "One Folder finished executing"

def Python_execute():
    from concurrent.futures import ProcessPoolExecutor as Pool
    args = FileList
    pool = Pool(max_workers=1)
    future = pool.submit(subprocess.call, args, shell=1)
    future.run_type = "run_type"
    future.jid = FileList
    future.add_done_callback(Python_callback)
    print("Python executed")

if __name__ == '__main__':
    import subprocess
    Python_execute()

问题是我不确定如何将 FileList 的每个元素传递给单独的 cpu

提前感谢您的帮助

【问题讨论】:

【参考方案1】:

最小的变化是对每个元素使用一次submit,而不是对整个列表使用一次:

futures = []
for file in FileList:
    future = pool.submit(subprocess.call, file, shell=1)
    future.blah blah
    futures.append(future)

futures 列表仅在您想对期货做一些事情时才需要——等待它们完成、检查它们的返回值等。

同时,您正在使用max_workers=1 显式创建池。毫不奇怪,这意味着您只会获得 1 个工作子进程,因此它最终会等待一个子进程完成,然后再获取下一个子进程。如果您想实际同时运行它们,请删除 max_workers 并让它默认为每个内核一个(或传递 max_workers=8 或其他不是 1 的数字,如果您有充分的理由覆盖默认值)。


虽然我们正在这样做,但有很多方法可以简化您正在做的事情:

你真的需要multiprocessing吗?如果您需要与每个子进程进行通信,那么在单个线程中执行此操作可能会很痛苦,但线程或asyncio 将与此处的进程一样工作。 更重要的是,看起来您实际上不需要任何东西,只需启动进程并等待它完成,这可以通过简单的同步代码完成。 为什么要构建一个字符串并使用shell=1 而不是只传递一个列表而不使用shell?使用 shell 会产生不必要的开销、安全问题和调试烦恼。 你真的不需要jid 在每个future 上——它只是你所有调用字符串的列表,没有用处。可能更有用的是某种标识符,或者子进程返回码,或者……可能还有很多其他的东西,但它们都是可以通过读取 subprocess.call 的返回值或一个简单的包装器来完成的。 您也确实不需要回调。如果您只是将所有期货收集在一个列表中并as_completed 它,您可以更简单地打印结果。 如果您同时执行上述两项操作,您将只剩下循环内的 pool.submit — 这意味着您可以将整个循环替换为 pool.map。 您很少需要或想要混合使用os.walkglob。当您实际上有一个 glob 模式时,将fnmatch 应用于来自os.walkfiles 列表。但是在这里,您只是在每个目录中寻找一个特定的文件名,所以实际上,您需要过滤的只是file == 'Read.py'。 您没有在循环中使用i。但如果您确实需要它,最好使用for i, file in enumerate(FileList):,而不是使用for file in FileList: 并手动增加i

【讨论】:

以上是关于python concurrent.futures 使用子进程,运行几个python脚本的主要内容,如果未能解决你的问题,请参考以下文章

python concurrent.futures

python的multiprocessing和concurrent.futures有啥区别?

Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]

python并发模块之concurrent.futures

python简单粗暴多线程之concurrent.futures

Python:inotify、concurrent.futures - 如何添加现有文件