使用 asyncio 等待子进程的结果

Posted

技术标签:

【中文标题】使用 asyncio 等待子进程的结果【英文标题】:Using asyncio to wait for results from subprocess 【发布时间】:2020-09-07 18:56:13 【问题描述】:

我的 Python 脚本包含一个循环,该循环使用 subprocess 在脚本之外运行命令。每个子进程都是独立的。如果出现错误,我会监听返回的消息;我不能忽略子流程的结果。这是没有 asyncio 的脚本(我已经用 sleep 替换了计算量很大的调用):

from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess

def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    for index in range(val):
        print("index = "+str(index))
        go_do_something(index)

# run the script
my_long_func(3) # launch three tasks

我想我可以使用asyncio 来加快此活动,因为 Python 脚本正在等待外部 subprocess 完成。我认为threadingmultiprocessing 不是必需的,尽管它们也可以加快执行速度。使用任务队列(例如 Celery)是另一种选择。

我尝试实现 asyncio 方法,但由于以下尝试不会改变整体执行时间,因此遗漏了一些东西:

import asyncio
from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess


async def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    # https://docs.python.org/3/library/asyncio-eventloop.html
    loop = asyncio.get_event_loop()
    tasks = []
    for index in range(val):
        task = go_do_something(index)
        tasks.append(task)
    # https://docs.python.org/3/library/asyncio-task.html
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)
    loop.close()
    return

my_long_func(3) # launch three tasks

如果我想监控每个subprocess 的输出,但不等待每个subprocess 运行,我可以从asyncio 中受益吗?还是这种情况需要multiprocessing 或 Celery 之类的东西?

【问题讨论】:

【参考方案1】:

尝试使用asyncio 而不是subprocess 执行命令。

定义一个run()函数:

import asyncio

async def run(cmd: str):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stderr=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE
    )

    stdout, stderr = await proc.communicate()

    print(f'[cmd!r exited with proc.returncode]')
    if stdout:
        print(f'[stdout]\nstdout.decode()')
    if stderr:
        print(f'[stderr]\nstderr.decode()')

然后你可以像调用任何async 函数一样调用它:

asyncio.run(run('sleep 2'))

#=>

['sleep 2' exited with 0]

示例取自官方documentation。也可以here.

【讨论】:

感谢您向我指出该文档;我不知道这种整合。当我尝试在我的案例中采用这个例子时,我失败了。具体来说,我尝试将async def run(cmd) 添加到上面的第一个示例中,将await run("sleep 2") 放在go_do_something(index) 中。这会导致一条错误消息指出 await run("sleep 2")SyntaxError: invalid syntax 这是我根据您的建议尝试的方法 -- pastebin.com/AU9YgbGG【参考方案2】:

@ronginat 将我指向https://asyncio.readthedocs.io/en/latest/subprocess.html,我能够适应我正在寻找的情况:

import asyncio

async def run_command(*args):
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
        *args,
        # stdout must a pipe to be accessible as process.stdout
        stdout=asyncio.subprocess.PIPE)
    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()
    # Return stdout
    return stdout.decode().strip()

async def go_do_something(index: int) -> None:
    print('index=',index)
    res = await run_command('sleep','2')
    return res

def my_long_func(val: int) -> None:
    task_list = []
    for indx in range(val):
        task_list.append( go_do_something(indx) )
    loop = asyncio.get_event_loop()
    commands = asyncio.gather(*task_list)
    reslt = loop.run_until_complete(commands)
    print(reslt)
    loop.close()

my_long_func(3) # launch three tasks

即使有三个持续时间为 2 秒的睡眠,总执行时间也刚刚超过 2 秒。我从每个子进程中获取标准输出。

【讨论】:

以上是关于使用 asyncio 等待子进程的结果的主要内容,如果未能解决你的问题,请参考以下文章

如何在asyncio python中使用子进程模块限制并发进程数

如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?

进程管理

僵尸进程和孤儿进程

Linux——进程控制(创建终止等待程序替换)

Linux——进程控制(创建终止等待程序替换)