Python 子进程、通信和多处理/多线程
Posted
技术标签:
【中文标题】Python 子进程、通信和多处理/多线程【英文标题】:Python subprocess, communicate, and multiprocessing/multithreading 【发布时间】:2021-07-16 02:09:56 【问题描述】:我有一个执行已编译 fortran 模块的脚本。然后必须以文件名的形式将输入传递给该进程,并且必须按下 Enter 以启动处理。我无法真正控制 fortran 可执行文件的性质。
我正在使用子进程和通信从 python 处理这个,它运行良好。问题是我需要处理 100 到 1000 个文件,并且按顺序处理它们很慢。虽然我预计我最终会在 HDD 当前遇到 I/O 瓶颈,但执行时间远未接近此限制。
我试图简单地将产生子进程的方法包装在多线程 ThreadPoolExecutor 中,但发现实际上只有一小部分文件得到处理(大约每 20 个,但会有所不同),其余文件已创建但为空(每个都是 0 kb 并且没有内容 - 就好像产生它们的子进程在创建句柄后过早地被杀死)
我尝试使用带有输入参数的 subprocess.run,自定义 os.pipes,TemporaryFile 作为管道,首先生成所有子进程,然后进行多线程调用进行通信,以及在通信之前生成进程后手动延迟,全部为否有用。
如果我首先生成子进程,我可以通过检查来确认每个子进程的 stdout、stdin 和 stderr 管道都有一个唯一标识符。
这是调用fortran模块的代码
def run_CEA2(fName_prefix):
print(fName_prefix)
CEA_call = subprocess.run('FCEA2.exe', input='\n'.format(fName_prefix), encoding='ascii',
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, cwd=None, check=False)
if 'DOES NOT EXIST' in CEA_call.stdout:
raise RuntimeError('\nERROR: Stdout returned by run_CEA()\n'+'\t'.join([line+'\n' for line in CEA_call.stdout.split('\n')]))
else:
return True
这是异步调用上述方法的代码
import concurrent.futures
def threadedRun(fName):
print('\tExecuting file '.format(fName))
run_CEA(fName)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(threadedRun, fNames)
print('\tDone.')
这是一个使用 Popen 和通信的 run_CEA 版本
def run_CEA(fName_prefix):
print(fName_prefix)
p = subprocess.Popen(['FCEA2.exe'], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE,shell=True)
return_str = p.communicate(input=('\n'.format(fName_prefix)).encode())[0].decode()
if 'DOES NOT EXIST' in return_str:
raise RuntimeError('\nERROR: Stdout returned by run_CEA()\n'+'\t'.join([line+'\n' for line in return_str.split('\n')]))
else:
return True
我不明白是什么导致生成的进程过早关闭。如上所述,我可以预先生成所有子进程,然后遍历一个列表并依次处理每个子进程。
当向混合中添加并发期货时,似乎信号被交叉并且多个衍生的进程一次被杀死。
有趣的是,当我仅使用并发期货来处理预先填充的子流程列表时,行为是相同的。不管已经存在的所有进程(不是在发生通信和关闭进程时即时生成),列表中大约每 20 个进程都会产生输出。
【问题讨论】:
【参考方案1】:令人尴尬的是,这个问题是 Fortran 问题,当我停止管道 stderr 并允许它传递到控制台时,这个问题变得很明显:
forrtl:severe (30): / 进程无法访问文件,因为它正在被 被另一个进程使用。
正在使用的 Fortran 可执行文件不仅从二进制文件中读取,而且还使用写入权限锁定它,这意味着它不能被多个可执行文件实例同时调用。
为了在运行时解决这个问题,我生成了 n 个临时文件夹,每个文件夹都包含 Fortran 可执行文件及其依赖项的完整副本。然后在调用 subprocess run 时使用 'cwd' 参数来拥有一堆线程并处理文件。
如果您熟悉所谓的 NASA CEA 代码。为了完整起见,下面是任何可能受益的人的代码。
import os
import shutil
import subprocess
from threading import Thread, Lock, current_thread
import queue
import functools
import threading
def run_CEA(fName_prefix,working_folder=None):
CEA_str = os.path.abspath(os.path.join(working_folder,'FCEA2.exe'))
CEA_call = subprocess.run(CEA_str, input='\n'.format(fName_prefix),
encoding='ascii', stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=False, cwd=working_folder, check=False)
if 'DOES NOT EXIST' in CEA_call.stdout:
raise RuntimeError('FCEA2.exe could not find specified input file\n'
+'\t'.join([line+'\n' for line in CEA_call.stdout.split('\n')]))
elif CEA_call.stderr:
raise RuntimeError('Error occured in call to FCEA2.exe\n'
+'\t'.join([line+'\n' for line in CEA_call.stderr.split('\n')]))
else:
return 1
def synchronized(lock):
""" Synchronization decorator """
def wrap(f):
@functools.wraps(f)
def newFunction(*args, **kw):
with lock:
return f(*args, **kw)
return newFunction
return wrap
class CEA_Queue(queue.Queue):
""" Based on template at provided by Shashwat Kumar found @
https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0"""
inp_folder = os.path.abspath('.//inp_files')
out_folder = os.path.abspath('.//out_files')
run_folder = os.path.abspath('.//workers')
exe_folder = os.path.abspath('.//cea_files')
req_cea_files = ["FCEA2.exe",
"b1b2b3.exe",
"syntax.exe",
"thermo.lib",
"trans.lib"]
lock = Lock()
@classmethod
def test_dirs_cls(cls):
print('test_dirs_cls:')
for dirname in ('inp_folder','out_folder','run_folder','exe_folder'):
print(dirname,':',getattr(cls,dirname))
def test_dirs_self(self):
print('test_dirs_self:')
for dirname in ('inp_folder','out_folder','run_folder','exe_folder'):
print(dirname,':',getattr(self,dirname))
@staticmethod
def clean_folder(target,ignore_list=[]):
if os.path.isdir(target):
for fName in os.listdir(target):
fPath = os.path.join(target,fName)
if os.path.isfile(fPath) and not fName in ignore_list:
os.remove(fPath)
elif os.path.isdir(fPath) and not fName in ignore_list:
shutil.rmtree(fPath)
@classmethod
def setup_folders(cls):
for folder in (cls.out_folder,cls.inp_folder,cls.run_folder):
if not os.path.isdir(folder):
os.mkdir(folder)
else:
cls.clean_folder(folder)
if not os.path.isdir(cls.exe_folder):
raise ValueError("Cannot find exe folder at:\n\t".format(cls.exe_folder))
else:
cls.clean_folder(cls.exe_folder,ignore_list=cls.req_cea_files)
@classmethod
def cleanup(cls):
cls.clean_folder(cls.run_folder)
out_files = []
for fName in os.listdir(cls.inp_folder):
if '.out' == fName[-4:]:
try:
shutil.move(os.path.join(cls.inp_folder,fName),
os.path.join(cls.out_folder,fName))
out_files.append(os.path.join(cls.out_folder,fName))
except Exception as exc:
print('WARNING: Could not move *.out file\n\n'.format(fName,exc))
return out_files
@classmethod
def gather_inputs(cls):
inp_files = []
for fName in os.listdir(cls.inp_folder):
if '.inp' in fName[-4:]:
inp_files.append(os.path.join(cls.inp_folder,fName))
return inp_files
@classmethod
def set_dirs(cls,inp_folder=None,out_folder=None,
run_folder=None,exe_folder=None):
if not inp_folder is None:
cls.inp_folder = os.path.abspath(inp_folder)
if not out_folder is None:
cls.out_folder = os.path.abspath(out_folder)
if not run_folder is None:
cls.run_folder = os.path.abspath(run_folder)
if not exe_folder is None:
cls.exe_folder = os.path.abspath(exe_folder)
def __init__(self, num_workers=1,inp_folder=None,out_folder=None,
run_folder=None,exe_folder=None):
queue.Queue.__init__(self)
self.set_dirs(inp_folder,out_folder,run_folder,exe_folder)
self.setup_folders()
self.num_workers = num_workers
self.n_task = 0
self.n_complete = 0
self.update_every = 10.
self.last_update = 0
def add_task(self, fName):
self.put(fName)
def schedule_tasks(self):
inp_files = self.gather_inputs()
for fName in inp_files:
self.add_task(fName.split('.inp')[0])
self.n_task = len(inp_files)
self.n_complete = 0
self.last_update = 0
return inp_files
def progress(self):
return (self.n_complete/self.n_task)*100
def start_workers(self):
self.worker_threads = []
for i in range(self.num_workers):
k = str(i)
worker_folder = os.path.join(self.run_folder,k)
try:
os.mkdir(worker_folder)
for fNameExe in os.listdir(self.exe_folder):
shutil.copy(os.path.join(self.exe_folder,fNameExe),os.path.join(worker_folder,fNameExe))
except Exception as exc:
raise exc
t = Thread(target=self.worker)
t.daemon = True
t.worker_folder = worker_folder
t.start()
self.worker_threads.append(t)
def worker(self):
while True:
try:
worker_folder = current_thread().worker_folder
fName = self.get()
rel_path = os.path.relpath(fName,worker_folder)
run_CEA(rel_path,worker_folder)
except Exception as exc:
print('ERROR: Worker failed on task\n\tFolder:\n\tFile:\n\t'.format(worker_folder,fName,exc))
finally:
self.task_done()
with self.lock:
self.n_complete+=1
current_progress = self.progress()
if (self.last_update==0 or current_progress==100. or
current_progress-self.last_update>=self.update_every):
print('\tCurrent progress: :>6.2f%'.format(current_progress))
self.last_update = current_progress
def run(self):
inp_files = self.schedule_tasks()
self.start_workers()
self.join()
out_files = self.cleanup()
return out_files
def tests(self,n):
inp_str = """! EXAMPLE 1
! (a) Assigned-temperature-and-pressure problem (tp).
! (b) Reactants are H2 and Air. Since "exploded ll formulas are not given,
! these formulas will be taken from the thermodynamic data library,
! thermo. lib.
! (c) Calculations are for two equivalence ratios (r,eq.ratio =1,1.5) .
! (d) Assigned pressures are I, 0.1, and 0.01 atm (p(atm)=l, .1, .01).
! (d) Assigned temperatures are 3000 and 2000 K (t(k)=3000,2000).
! (f) 'only' dataset is used to restrict possible products.
! (g) Energy units in the final tables are in calories (calories).
problem case=Example-1 tp p(atm)=1,.1,.01, t(k)=3000,2000,
r,eq.ratio=1,1.5
reac
fuel= H2 moles = 1.
oxid= Air moles = 1.
only Ar C CO CO2 H H2 H2O HNO HO2 HNO2 HNO3 N NH
NO N2 N2O3 O O2 OH O3
output calories
end
"""
self.setup_folders()
for i in range(n):
fName = 'test:0>4'.format(i)
fName = os.path.abspath(os.path.join(self.inp_folder,fName+'.inp'))
f = open(fName,'w')
f.write(inp_str)
f.close()
return self.run()
if __name__ == "__main__":
if True:
import time
start_time = time.time()
Q = CEA_Queue(12)
out_files = Q.tests(10_000)
end_time = time.time()
print('Processing took :5.2f'.format(end_time-start_time))
在我的 8 核机器上,最佳点是大约 12 个线程。下面是一个示例曲线,将运行时间与处理问题工作负载的线程数进行比较。
【讨论】:
以上是关于Python 子进程、通信和多处理/多线程的主要内容,如果未能解决你的问题,请参考以下文章