Python 子进程、通信和多处理/多线程

Posted

技术标签:

【中文标题】Python 子进程、通信和多处理/多线程【英文标题】:Python subprocess, communicate, and multiprocessing/multithreading 【发布时间】:2021-07-16 02:09:56 【问题描述】:

我有一个执行已编译 fortran 模块的脚本。然后必须以文件名的形式将输入传递给该进程,并且必须按下 Enter 以启动处理。我无法真正控制 fortran 可执行文件的性质。

我正在使用子进程和通信从 python 处理这个,它运行良好。问题是我需要处理 100 到 1000 个文件,并且按顺序处理它们很慢。虽然我预计我最终会在 HDD 当前遇到 I/O 瓶颈,但执行时间远未接近此限制。

我试图简单地将产生子进程的方法包装在多线程 ThreadPoolExecutor 中,但发现实际上只有一小部分文件得到处理(大约每 20 个,但会有所不同),其余文件已创建但为空(每个都是 0 kb 并且没有内容 - 就好像产生它们的子进程在创建句柄后过早地被杀死)

我尝试使用带有输入参数的 subprocess.run,自定义 os.pipes,TemporaryFile 作为管道,首先生成所有子进程,然后进行多线程调用进行通信,以及在通信之前生成进程后手动延迟,全部为否有用。

如果我首先生成子进程,我可以通过检查来确认每个子进程的 stdout、stdin 和 stderr 管道都有一个唯一标识符。

这是调用fortran模块的代码

def run_CEA2(fName_prefix):
    print(fName_prefix)
    CEA_call = subprocess.run('FCEA2.exe', input='\n'.format(fName_prefix), encoding='ascii', 
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                            shell=True, cwd=None, check=False)
    if 'DOES NOT EXIST' in CEA_call.stdout:
        raise RuntimeError('\nERROR: Stdout returned by run_CEA()\n'+'\t'.join([line+'\n' for line in CEA_call.stdout.split('\n')]))
    else:
        return True 

这是异步调用上述方法的代码

import concurrent.futures
def threadedRun(fName):
    print('\tExecuting file '.format(fName))
    run_CEA(fName)      
    
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    executor.map(threadedRun, fNames)            
print('\tDone.')

这是一个使用 Popen 和通信的 run_CEA 版本

def run_CEA(fName_prefix):
    print(fName_prefix)
    p = subprocess.Popen(['FCEA2.exe'], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE,shell=True)
    return_str =  p.communicate(input=('\n'.format(fName_prefix)).encode())[0].decode()
    if 'DOES NOT EXIST' in return_str:
        raise RuntimeError('\nERROR: Stdout returned by run_CEA()\n'+'\t'.join([line+'\n' for line in return_str.split('\n')]))
    else:
        return True  

我不明白是什么导致生成的进程过早关闭。如上所述,我可以预先生成所有子进程,然后遍历一个列表并依次处理每个子进程。

当向混合中添加并发期货时,似乎信号被交叉并且多个衍生的进程一次被杀死。

有趣的是,当我仅使用并发期货来处理预先填充的子流程列表时,行为是相同的。不管已经存在的所有进程(不是在发生通信和关闭进程时即时生成),列表中大约每 20 个进程都会产生输出。

【问题讨论】:

【参考方案1】:

令人尴尬的是,这个问题是 Fortran 问题,当我停止管道 stderr 并允许它传递到控制台时,这个问题变得很明显:

forrtl:severe (30): / 进程无法访问文件,因为它正在被 被另一个进程使用。

正在使用的 Fortran 可执行文件不仅从二进制文件中读取,而且还使用写入权限锁定它,这意味着它不能被多个可执行文件实例同时调用。

为了在运行时解决这个问题,我生成了 n 个临时文件夹,每个文件夹都包含 Fortran 可执行文件及其依赖项的完整副本。然后在调用 subprocess run 时使用 'cwd' 参数来拥有一堆线程并处理文件。

如果您熟悉所谓的 NASA CEA 代码。为了完整起见,下面是任何可能受益的人的代码。

import os
import shutil
import subprocess
from threading import Thread, Lock, current_thread
import queue 
import functools
import threading

def run_CEA(fName_prefix,working_folder=None):
    CEA_str = os.path.abspath(os.path.join(working_folder,'FCEA2.exe'))
    CEA_call = subprocess.run(CEA_str, input='\n'.format(fName_prefix),
                              encoding='ascii', stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                              shell=False, cwd=working_folder, check=False)        
    if 'DOES NOT EXIST' in CEA_call.stdout:
        raise RuntimeError('FCEA2.exe could not find specified input file\n'
            +'\t'.join([line+'\n' for line in CEA_call.stdout.split('\n')]))
    elif CEA_call.stderr:
        raise RuntimeError('Error occured in call to FCEA2.exe\n'
            +'\t'.join([line+'\n' for line in CEA_call.stderr.split('\n')]))    
    else:
        return 1

def synchronized(lock):
    """ Synchronization decorator """
    def wrap(f):
        @functools.wraps(f)
        def newFunction(*args, **kw):
            with lock:
                return f(*args, **kw)
        return newFunction
    return wrap
        
class CEA_Queue(queue.Queue):
    """ Based on template at provided by Shashwat Kumar found @
    https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0"""

    inp_folder = os.path.abspath('.//inp_files')
    out_folder = os.path.abspath('.//out_files')    
    run_folder = os.path.abspath('.//workers')
    exe_folder = os.path.abspath('.//cea_files')
    
    req_cea_files = ["FCEA2.exe",
                     "b1b2b3.exe",
                     "syntax.exe",
                     "thermo.lib",
                     "trans.lib"]
    lock = Lock()

    @classmethod
    def test_dirs_cls(cls):
        print('test_dirs_cls:')
        for dirname in ('inp_folder','out_folder','run_folder','exe_folder'):
            print(dirname,':',getattr(cls,dirname))

    def test_dirs_self(self):
        print('test_dirs_self:')
        for dirname in ('inp_folder','out_folder','run_folder','exe_folder'):
            print(dirname,':',getattr(self,dirname))


    @staticmethod
    def clean_folder(target,ignore_list=[]):
        if os.path.isdir(target):                
            for fName in os.listdir(target):
                fPath = os.path.join(target,fName)        
                if os.path.isfile(fPath) and not fName in ignore_list:
                    os.remove(fPath)
                elif os.path.isdir(fPath) and not fName in ignore_list:
                    shutil.rmtree(fPath)    
    
    @classmethod
    def setup_folders(cls):
        for folder in (cls.out_folder,cls.inp_folder,cls.run_folder):
            if not os.path.isdir(folder):
                os.mkdir(folder)
            else:
                cls.clean_folder(folder)
                
        if not os.path.isdir(cls.exe_folder):                                    
            raise ValueError("Cannot find exe folder at:\n\t".format(cls.exe_folder))    
        else:
            cls.clean_folder(cls.exe_folder,ignore_list=cls.req_cea_files)

    @classmethod
    def cleanup(cls):                 
        cls.clean_folder(cls.run_folder)  
        out_files = []
        for fName in os.listdir(cls.inp_folder):
            if '.out' == fName[-4:]:
                try:
                    shutil.move(os.path.join(cls.inp_folder,fName),
                                os.path.join(cls.out_folder,fName)) 
                    out_files.append(os.path.join(cls.out_folder,fName))
                except Exception as exc:
                    print('WARNING: Could not move *.out file\n\n'.format(fName,exc))
        return out_files
    
    @classmethod
    def gather_inputs(cls):
        inp_files = []
        for fName in os.listdir(cls.inp_folder):
            if '.inp' in fName[-4:]:                
                inp_files.append(os.path.join(cls.inp_folder,fName))
        return inp_files
    
    @classmethod
    def set_dirs(cls,inp_folder=None,out_folder=None,
                 run_folder=None,exe_folder=None):
        if not inp_folder is None:
            cls.inp_folder = os.path.abspath(inp_folder)
        if not out_folder is None:
            cls.out_folder = os.path.abspath(out_folder)
        if not run_folder is None:
            cls.run_folder = os.path.abspath(run_folder)
        if not exe_folder is None:
            cls.exe_folder = os.path.abspath(exe_folder)            
        
    def __init__(self, num_workers=1,inp_folder=None,out_folder=None,
                 run_folder=None,exe_folder=None):                        
        queue.Queue.__init__(self) 
        self.set_dirs(inp_folder,out_folder,run_folder,exe_folder)
        self.setup_folders()                        
        self.num_workers = num_workers       
        self.n_task = 0
        self.n_complete = 0
        self.update_every = 10.
        self.last_update = 0
                   
    def add_task(self, fName):
        self.put(fName)
            
    def schedule_tasks(self):
        inp_files = self.gather_inputs()
        for fName in inp_files:
            self.add_task(fName.split('.inp')[0])
        self.n_task = len(inp_files)
        self.n_complete = 0
        self.last_update = 0
        return inp_files
    
    def progress(self):
        return (self.n_complete/self.n_task)*100
        
    
    def start_workers(self):
        self.worker_threads = []
        for i in range(self.num_workers):            
            k = str(i)
            worker_folder =  os.path.join(self.run_folder,k)            
            try:
                os.mkdir(worker_folder)
                for fNameExe in os.listdir(self.exe_folder):
                    shutil.copy(os.path.join(self.exe_folder,fNameExe),os.path.join(worker_folder,fNameExe))                   
            except Exception as exc:
                raise exc                                                                      
            t = Thread(target=self.worker)
            t.daemon = True
            t.worker_folder = worker_folder
            t.start()
            self.worker_threads.append(t)
        
    def worker(self):
        while True:
            try:
                worker_folder = current_thread().worker_folder             
                fName = self.get()        
                rel_path = os.path.relpath(fName,worker_folder)
                run_CEA(rel_path,worker_folder)                                     
            except Exception as exc:
                print('ERROR: Worker failed on task\n\tFolder:\n\tFile:\n\t'.format(worker_folder,fName,exc))
            finally:
                self.task_done()
                with self.lock:
                    self.n_complete+=1
                    current_progress = self.progress()
                    if (self.last_update==0 or current_progress==100. or
                        current_progress-self.last_update>=self.update_every):                        
                        print('\tCurrent progress: :>6.2f%'.format(current_progress))
                        self.last_update = current_progress
    
    def run(self):        
        inp_files = self.schedule_tasks()
        self.start_workers() 
        self.join()
        out_files = self.cleanup()
        return out_files
    
    def tests(self,n):
        inp_str = """! EXAMPLE 1
        ! (a) Assigned-temperature-and-pressure problem (tp).
        ! (b) Reactants are H2 and Air. Since "exploded ll formulas are not given,
        !     these formulas will be taken from the thermodynamic data library,
        !     thermo. lib.
        ! (c) Calculations are for two equivalence ratios (r,eq.ratio =1,1.5) .
        ! (d) Assigned pressures are I, 0.1, and 0.01 atm (p(atm)=l, .1, .01).
        ! (d) Assigned temperatures are 3000 and 2000 K (t(k)=3000,2000).
        ! (f) 'only' dataset is used to restrict possible products.
        ! (g) Energy units in the final tables are in calories (calories).

        problem case=Example-1 tp p(atm)=1,.1,.01, t(k)=3000,2000,
                r,eq.ratio=1,1.5
        reac
        fuel= H2  moles = 1.
        oxid= Air moles = 1.
        only Ar C CO CO2 H H2 H2O HNO HO2 HNO2 HNO3 N NH
        NO N2 N2O3 O O2 OH O3
        output calories
        end
        """        
        self.setup_folders()     
        for i in range(n):
            fName = 'test:0>4'.format(i)
            fName = os.path.abspath(os.path.join(self.inp_folder,fName+'.inp'))    
            f = open(fName,'w')
            f.write(inp_str)
            f.close()            
        return self.run()

if __name__ == "__main__":
    if True:
        import time
        start_time = time.time()
        Q = CEA_Queue(12)
        out_files = Q.tests(10_000)
        end_time = time.time()
        print('Processing took :5.2f'.format(end_time-start_time))

在我的 8 核机器上,最佳点是大约 12 个线程。下面是一个示例曲线,将运行时间与处理问题工作负载的线程数进行比较。

【讨论】:

以上是关于Python 子进程、通信和多处理/多线程的主要内容,如果未能解决你的问题,请参考以下文章

java 多线程子线程唤醒主线程问题

什么是多线程,多进程?

多线程与多进程的比较

子进程子回溯

python Event对象队列和多进程基础

如何多线程(多进程)加速while循环(语言-python)?