如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?

Posted

技术标签:

【中文标题】如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?【英文标题】:How to use asyncio to read from a subprocess using SubprocessProtocol and terminate that subprocess at an arbitrary point? 【发布时间】:2019-06-10 09:13:14 【问题描述】:

使用答案here 作为基础(使用SubprocessProtocol),我只是尝试从子进程读取并在我选择的点停止读取(并终止子进程)(例如,我'已经读取了足够的数据)。

请注意,我确实想要使用 run_until_complete per another discussion 的好处。

我碰巧使用的是 Windows,下面的示例使用的是 Cygwin 的 cat。我使用的实际实用程序只是一个本机 Windows 控制台应用程序 - 但它会一直流式传输,直到手动关闭。

我可以很好地读取数据,但是我尝试停止读取并关闭子进程(例如,从 pipe_data_received() 中调用 loop.stop())会导致异常(RuntimeError: Event loop is closedValueError: I/O operation on closed pipe)。我想立即优雅地终止子进程。

我不认为这是一个平台,因为我没有看到在哪里适当地打断事情以获得预期的效果。关于如何实现这一点的任何想法?

我的 Python 3.7+ 代码(根据示例修改):

import asyncio
import os

external_program = "cat"  # Something that will output to stdio
external_option = "a"  # An arbitrarily large amount of data
saved_data = []

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            data_len = len(data)
            print(''.join(' :02x'.format(x) for x in data), flush=True)
            saved_data.extend(data)
            if len(saved_data) > 512:  # Stop once we've read this much data
                loop.call_soon_threadsafe(loop.stop)
    def connection_lost(self, exc):
        print("Connection lost")
        loop.stop() # end loop.run_forever()

print("START")

if os.name == 'nt':
    # On Windows, the ProactorEventLoop is necessary to listen on pipes
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(
        loop.subprocess_exec(
            SubprocessProtocol, 
            external_program,
            external_option,
        )
    )
    loop.run_forever()
finally:
    loop.close()

print("DONE")
loop.close()

【问题讨论】:

【参考方案1】:

不是 asyncio 专家,但这样的东西应该可以工作。

import time
import asyncio
import threading

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def __init__(self, loop):
        self.transport = None
        self.loop = loop

    def pipe_data_received(self, fd, data):
        print('data received')

    def connection_lost(self, exc):
        print("Connection lost")

    def connection_made(self, transport):
        print("Connection made")

        self.transport = transport

        # becasue calc won't call pipe_data_received method.
        t = threading.Thread(target=self._endme)
        t.setDaemon(True)
        t.start()

    def _endme(self):
        time.sleep(2)
        # You'd normally use these inside pipe_data_received, connection_lost methods
        self.transport.close()
        self.loop.stop()


def main():
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(loop.subprocess_exec(
            lambda: SubprocessProtocol(loop), 
            'calc.exe'
        ))

    loop.run_forever()
    loop.close()

if __name__ == "__main__":
    main()

【讨论】:

好吧,这与 calc.exe 配合得非常好(至少在 Windows 10 中)。但是,它似乎与我实际需要使用的命令行实用程序配合得很好,所以这很酷(也是最重要的部分)!我会稍微研究一下,看看我是否可以让它在阅读极限方面做我需要它做的事情,但它看起来很有希望...... 哦,真的吗?我在 Windows 8.1 上尝试过,没有发现“calc.exe”有任何问题。无论如何,很高兴听到它有帮助。我不明白为什么它也不应该在pipe_data_received 中工作。请告诉我进展如何。 我认为 Windows 10 中的计算器与以前的应用程序不同——它会启动但不会关闭。我在您在此处建议的更改方面取得了进展,现在我只是想看看是否有任何 clean 方法来捕获 KeyboardInterrupt。另外(来自原始代码)我想知道将 loop.run_until_complete() 和 loop.run_forever() 放在一起的目的是什么...... 您应该能够通过 this 之类的方式实现两者。您可以使用Event 将KeyboardInterrupt 传递给SubprocessProtocol *您可能必须使用multiprocessing.Event,因为上面不是线程安全的,并且不确定它在多处理时的表现。

以上是关于如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?

如何在asyncio python中使用子进程模块限制并发进程数

使用 asyncio 运行协程后如何继续原始事件循环?

Flask 作者写万字长文谈 asyncio(上)

使用 asyncio ProactorEventLoop 时如何分配线程池

在这种情况下如何正确使用 asyncio?