连接两个以 asyncio.subprocess.create_subprocess_exec() 开始的进程

Posted

技术标签:

【中文标题】连接两个以 asyncio.subprocess.create_subprocess_exec() 开始的进程【英文标题】:Connect two processes started with asyncio.subprocess.create_subprocess_exec() 【发布时间】:2016-04-15 22:20:34 【问题描述】:

当使用老式 subprocess.Popen() API 启动两个进程时,我可以轻松地将一个进程的 standard out 连接到另一个进程的 standard in,从而在与使用| 连接命令时的 UNIX shell 相同:

from subprocess import Popen, PIPE

process_1 = Popen(['ls'], stdout = PIPE)
process_2 = Popen(['wc'], stdin = process_1.stdout)

process_1.wait()
process_2.wait()

当使用来自asyncio.subprocess.create_subprocess_exec()(或类似的)的异步 API 时,我怎样才能完成相同的操作?这是我尝试过的:

from asyncio.events import get_event_loop
from asyncio.subprocess import PIPE, create_subprocess_exec

async def main():
    process_1 = await create_subprocess_exec('ls', stdout = PIPE)
    process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)

    await process_1.wait()
    await process_2.wait()

get_event_loop().run_until_complete(main())

但是对create_subprocess_exec() 的第二次调用抱怨传递给stdin 的参数没有fileno(这是真的):

Traceback (most recent call last):
  File ".../test-async.py", line 11, in <module>
     get_event_loop().run_until_complete(main())
[...]
  File ".../test-async.py", line 6, in main
     process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)
[...]
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/subprocess.py", line 1388, in _get_handles
     p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

我怎样才能得到与上面同步示例相同的结果?

【问题讨论】:

不相关:您应该在Popen(['wc'],..) 之后添加process_1.stdout.close()。见How do I use subprocess.Popen to connect multiple processes by pipes? 【参考方案1】:

在 asyncio 中,process.stdout 实际上是一个StreamReader,而不是一个文件对象。可以通过process._transport._proc.stdout 访问文件对象。不幸的是,您将无法使用它,因为它已经在事件循环中注册以提供流接口process.stdout

处理该问题的一种方法是创建自己的管道并将文件描述符传递给子进程:

async def main():
    read, write = os.pipe()
    process_1 = await create_subprocess_exec('ls', stdout=write)
    os.close(write)
    process_2 = await create_subprocess_exec('wc', stdin=read, stdout=PIPE)
    os.close(read)
    return await process_2.stdout.read()

请注意,一旦第一个子进程启动,write 文件描述符应该被显式关闭(除非您使用subprocess.PIPE,否则它不会自动关闭)。 read 文件描述符也需要关闭,如 here 所述。

【讨论】:

您能否为语句提供参考:“只有使用 subprocess.PIPE 时才会自动关闭”?据我了解,您希望父级中的os.close(write) 的原因是,如果wc 过早死亡,ls 将得到SIGPIPEEPIPEsubprocess.PIPE 也是如此(至少对于普通的Popen() 来说,如the code example in the docs 所示) @J.F.Sebastian 我在查看the code 时注意到了它,我不知道它是否记录在某个地方。但是,您的示例是关于关闭读者,而不是作者。在我的示例中,需要关闭编写器才能正确设置管道(请参阅this answer)。 您对管道的 write 末端是正确的(它已关闭,因为它在父级中未使用 - 它也可以解释为什么它是创建 subprocess.PIPE 的代码的责任管道关闭它——它与asyncio无关,相应的os.close()调用在subprocess模块中)。我引用的示例对应于上面的process_1.stdout.close(),即应该调用os.close(read) @J.F.Sebastian 你说得对,我没有意识到读者也需要关闭。感谢您的洞察力,请参阅我的编辑。

以上是关于连接两个以 asyncio.subprocess.create_subprocess_exec() 开始的进程的主要内容,如果未能解决你的问题,请参考以下文章

Python asyncio.subprocess():如何查看它是不是还在运行

使用 asyncio.create_subprocess_exec 设置最大并发

任何人都可以将此 rsync 命令解析为与 asyncio.subprocess 一起使用的列表吗?

为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?

从 async.subprocess.PIPE 读取

输出 asyncio 子进程调用的命令行?