使用 asyncio 等待子进程的结果
Posted
技术标签:
【中文标题】使用 asyncio 等待子进程的结果【英文标题】:Using asyncio to wait for results from subprocess 【发布时间】:2020-09-07 18:56:13 【问题描述】:我的 Python 脚本包含一个循环,该循环使用 subprocess
在脚本之外运行命令。每个子进程都是独立的。如果出现错误,我会监听返回的消息;我不能忽略子流程的结果。这是没有 asyncio 的脚本(我已经用 sleep
替换了计算量很大的调用):
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
for index in range(val):
print("index = "+str(index))
go_do_something(index)
# run the script
my_long_func(3) # launch three tasks
我想我可以使用asyncio
来加快此活动,因为 Python 脚本正在等待外部 subprocess
完成。我认为threading
或multiprocessing
不是必需的,尽管它们也可以加快执行速度。使用任务队列(例如 Celery)是另一种选择。
我尝试实现 asyncio
方法,但由于以下尝试不会改变整体执行时间,因此遗漏了一些东西:
import asyncio
from subprocess import PIPE # https://docs.python.org/3/library/subprocess.html
import subprocess
async def go_do_something(index: int) -> None:
"""
This function takes a long time
Nothing is returned
Each instance is independent
"""
process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
if "error" in stderr:
print("error for "+str(index))
return
def my_long_func(val: int) -> None:
"""
This function contains a loop
Each iteration of the loop calls a function
Nothing is returned
"""
# https://docs.python.org/3/library/asyncio-eventloop.html
loop = asyncio.get_event_loop()
tasks = []
for index in range(val):
task = go_do_something(index)
tasks.append(task)
# https://docs.python.org/3/library/asyncio-task.html
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
loop.close()
return
my_long_func(3) # launch three tasks
如果我想监控每个subprocess
的输出,但不等待每个subprocess
运行,我可以从asyncio
中受益吗?还是这种情况需要multiprocessing
或 Celery 之类的东西?
【问题讨论】:
【参考方案1】:尝试使用asyncio
而不是subprocess
执行命令。
定义一个run()
函数:
import asyncio
async def run(cmd: str):
proc = await asyncio.create_subprocess_shell(
cmd,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(f'[cmd!r exited with proc.returncode]')
if stdout:
print(f'[stdout]\nstdout.decode()')
if stderr:
print(f'[stderr]\nstderr.decode()')
然后你可以像调用任何async
函数一样调用它:
asyncio.run(run('sleep 2'))
#=>
['sleep 2' exited with 0]
示例取自官方documentation。也可以here.
【讨论】:
感谢您向我指出该文档;我不知道这种整合。当我尝试在我的案例中采用这个例子时,我失败了。具体来说,我尝试将async def run(cmd)
添加到上面的第一个示例中,将await run("sleep 2")
放在go_do_something(index)
中。这会导致一条错误消息指出 await run("sleep 2")
是 SyntaxError: invalid syntax
这是我根据您的建议尝试的方法 -- pastebin.com/AU9YgbGG【参考方案2】:
@ronginat 将我指向https://asyncio.readthedocs.io/en/latest/subprocess.html,我能够适应我正在寻找的情况:
import asyncio
async def run_command(*args):
# Create subprocess
process = await asyncio.create_subprocess_exec(
*args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout
return stdout.decode().strip()
async def go_do_something(index: int) -> None:
print('index=',index)
res = await run_command('sleep','2')
return res
def my_long_func(val: int) -> None:
task_list = []
for indx in range(val):
task_list.append( go_do_something(indx) )
loop = asyncio.get_event_loop()
commands = asyncio.gather(*task_list)
reslt = loop.run_until_complete(commands)
print(reslt)
loop.close()
my_long_func(3) # launch three tasks
即使有三个持续时间为 2 秒的睡眠,总执行时间也刚刚超过 2 秒。我从每个子进程中获取标准输出。
【讨论】:
以上是关于使用 asyncio 等待子进程的结果的主要内容,如果未能解决你的问题,请参考以下文章
如何在asyncio python中使用子进程模块限制并发进程数