Python3 asyncio:wait_for()通信()超时,如何获得部分结果?

Posted

技术标签:

【中文标题】Python3 asyncio:wait_for()通信()超时,如何获得部分结果?【英文标题】:Python3 asyncio: wait_for() communicate() with timeout, how to get partial result? 【发布时间】:2017-03-07 04:02:26 【问题描述】:

The Python docs about asyncio - Subprocess say:

communicate()wait() 方法不带超时参数:使用 wait_for() 函数

使用wait_for()communicate() 施加超时非常容易,但是我找不到从中断的communicate() 调用中检索部分结果的方法,并且随后对communicate() 的调用没有也可以归还丢失的部分。

示例脚本:

#! /usr/bin/env python3

import asyncio

async def communicate_short(loop):
    p = await asyncio.create_subprocess_exec('ping', '127.0.0.1', '-n', '4', stdout=asyncio.subprocess.PIPE)
    # For Linux: use '-c' instead of '-n'

    try:
        # 2 seconds timeout
        res = await asyncio.wait_for(p.communicate(), 2)
    except asyncio.TimeoutError as e:
        # After timeout happens:
        # How do I get the subprocess's STDOUT up to this point?
        try:
            print(res[0].decode('utf-8'))
            # Will raise NameError since the communicate() call did not complete
        except NameError as e:
            print('NameError: %s' % e)


    res = await p.communicate()
    print(res[0].decode('utf-8'))
    # Only prints the later half of ping's STDOUT

if __name__ == '__main__':
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
    # For Linux: just do loop = asyncio.get_event_loop()

    loop.run_until_complete(communicate_short(loop))

示例脚本的输出:

NameError: local variable 'res' referenced before assignment
Reply from 127.0.0.1: bytes=32 time<1ms TTL=128
Reply from 127.0.0.1: bytes=32 time<1ms TTL=128

Ping statistics for 127.0.0.1:
    Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
    Minimum = 0ms, Maximum = 0ms, Average = 0ms

请注意,仅打印最后 2 个数据包。前 2 个数据包的输出丢失。

那么,我应该如何在超时发生之前从子进程中获取输出?

编辑:更准确地说,理想情况下我正在寻找的东西是:

    执行communicate() 所做的工作,即异步写入子进程的 STDIN 并读取其 STDOUT 和 STDERR,而不会出现死锁 (that the docs ominously warn about);

    具有可配置的总超时时间,因此当子进程终止或达到超时时,将返回到目前为止接收到的 STDOUT 和 STDERR。

看起来这样的东西还不存在,必须实现它。

【问题讨论】:

你用了什么解决方案? @Udi 我将程序逻辑更改为简单地使用 readline() 来处理到达的行。 【参考方案1】:

对于您问题的第二部分,“我应该如何在超时发生之前从子进程获取输出?”我建议使用不会取消任务的asyncio.wait()(@987654323 @) 而不是 asyncio.wait_for()(取消任务):

task = asyncio.Task(p.communicate())
done, pending = await asyncio.wait([task], timeout=2)
if pending:
    print("timeout!", task._state)
res = await task  # Note: It is OK to await a task more than once
print(res[0].decode())

关于 "retrieve the partial results" ,我建议不要使用调用 stdout.read()communicate() 并使用不同的方法:

import asyncio


async def ping(loop, host):
    p = await asyncio.create_subprocess_exec(
        'ping', host, '-c', '4',
        stdout=asyncio.subprocess.PIPE, loop=loop)

    async for line in p.stdout:
        print(host, "==>", line.decode(), end="")

    print(host, "done")


if __name__ == '__main__':
    loop = loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [
        ping(loop, '8.8.8.8'),
        ping(loop, '127.0.0.1'),
        ping(loop, 'example.com'),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

结合这两种解决方案(并使用 readline() 而不是更酷的 async for)给出:

import asyncio


async def ping(loop, host):
    p = await asyncio.create_subprocess_exec(
        'ping', host, '-c', '10',
        stdout=asyncio.subprocess.PIPE, loop=loop)

    n = 0
    while True:
        n += 1
        task = asyncio.Task(p.stdout.readline())
        done, pending = await asyncio.wait([task], timeout=1)
        if not done:
            print(host, n, "==>", "Timeout!")
        line = await task
        if not line:
            break
        print(host, n, "==>", line.decode(), end="")

    print(host, "==>", "done")


if __name__ == '__main__':
    loop = loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [
        # ping(loop, '8.8.8.8'),
        # ping(loop, '127.0.0.1'),
        ping(loop, 'example.com'),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

请注意超时(1 秒)是每行。

另请参阅:https://github.com/aio-libs/async-timeout

【讨论】:

刚刚添加了一点问题。我希望这样的功能已经存在,但它看起来必须单独实现。 你是对的。 communiate() 工作 "...直到到达文件结尾..." 而不是部分结果。您可能可以使用asyncio.wait(tasks) 安全地与标准输出+标准输入通信。示例参考communicate()源代码。【参考方案2】:

如果预期在超时后杀死子进程,您可以获得如下部分输出:

future = asyncio.ensure_future(p.communicate())
done, pending = await asyncio.wait([future], timeout=2)
if pending:
    # timeout
    if p.returncode is None:
        # kill the subprocess, then `await future` will return soon
        try:
            p.kill()
        except ProcessLookupError:
            pass
output, err = await future
print(output.decode('utf-8'))

【讨论】:

【参考方案3】:

你必须使用process.stdout:

data = await process.stdout.readline()

请参阅user documentation 中的示例。

【讨论】:

以上是关于Python3 asyncio:wait_for()通信()超时,如何获得部分结果?的主要内容,如果未能解决你的问题,请参考以下文章

在超时中包装 asyncio.gather

python之async-timeout模块

python3.4的asyncio用法

Python3 asyncio 简介

asyncio--python3未来并发编程主流充满野心的模块

Python循环运行一定的秒数