Python中子进程读取线超时

Posted

技术标签:

【中文标题】Python中子进程读取线超时【英文标题】:Timeout on subprocess readline in Python 【发布时间】:2012-06-01 03:39:21 【问题描述】:

我有一个小问题,我不太确定如何解决。这是一个最小的例子:

我有什么

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    some_criterium = do_something(line)

我想要什么

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    if nothing_happens_after_10s:
        break
    else:
        some_criterium = do_something(line)

我从子进程中读取了一行并对其进行了处理。如果在固定时间间隔后没有线路到达,我该如何退出?

【问题讨论】:

相关:Non-blocking read on a subprocess.PIPE in python 相关:Stop reading process output in Python without hang? 耳语标准 @SteveCarter 是的,措辞可以改进。我很乐意接受相应的修改。 【参考方案1】:

感谢大家的回答!

我找到了一种方法来解决我的问题,只需使用 select.poll 来查看标准输出。

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
    poll_result = poll_obj.poll(0)
    if poll_result:
        line = scan_process.stdout.readline()
        some_criterium = do_something(line)
    update(time_limit)

【讨论】:

虽然这似乎可行,但它并不可靠——考虑您的子进程是否在没有新行的情况下输出某些内容。 select/poll 会触发,但readline 会无限期阻塞。 可能不适用于 Windows,select.poll() 仅适用于套接字。 docs.python.org/2/library/select.html 我没有在 Windows 中测试过这个解决方案,所以你可能是对的,我知道它可以在 OSX 和 Linux 下运行。 @gentimouton: asyncio can read subprocess' output asynchroniously in a portable manner @DimaTisnek,所以如果根本没有回行,程序还是会被readline永远阻塞?【参考方案2】:

这是一个便携式解决方案,它使用asyncio 强制读取单行超时:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def run_command(*args, timeout=None):
    # Start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec(*args,
            stdout=PIPE, stderr=STDOUT)

    # Read line (sequence of bytes ending with b'\n') asynchronously
    while True:
        try:
            line = await asyncio.wait_for(process.stdout.readline(), timeout)
        except asyncio.TimeoutError:
            pass
        else:
            if not line: # EOF
                break
            elif do_something(line):
                continue # While some criterium is satisfied
        process.kill() # Timeout or some criterion is not satisfied
        break
    return await process.wait() # Wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
                                                 timeout=10))
loop.close()

【讨论】:

这太棒了,伟大的工作!对于可能有不止一条预期线路的其他人,我建议使用 process.stdout.read() 而不是 readline() @jftuga: .read() 在这里不正确。问题是关于.readline()。如果您需要所有输出,那么使用带有超时的.communicate() 会更简单。阅读my comment under the answer that uses .communicate() @JanKaifer 是的。指向 Python 3 文档的链接和显式的 shebang #!... python3 都指向 Python 3。当前的 Python 版本是 3.6。答案中的语法是 Python 3.5(2015 年发布)。 如果您可以将所做的一切切换到asyncio,那就太好了。想要使用queue.Queue 与任何东西进行交互?很难,这打破了asyncio。有要注册回调的非asyncio 库吗?艰难的。 asyncio 与其他任何事物都不能很好地互动,而且似乎总是比它的价值更麻烦。 @Tom:除非不是很明显,否则您可以在 asyncio 代码中与不使用 asyncio 的代码进行交互,例如,asyncio.to_thread,是的,处理异步与异步代码。阻塞分隔(具有彩色功能)是一个普遍的问题journal.stuffwithstuff.com/2015/02/01/…【参考方案3】:

我在 Python 中使用了一些更通用的东西(如果我没记错的话,也是从 *** 问题拼凑而成的,但我不记得是哪些问题了)。

import thread
from threading import Timer

def run_with_timeout(timeout, default, f, *args, **kwargs):
    if not timeout:
        return f(*args, **kwargs)
    try:
        timeout_timer = Timer(timeout, thread.interrupt_main)
        timeout_timer.start()
        result = f(*args, **kwargs)
        return result
    except KeyboardInterrupt:
        return default
    finally:
        timeout_timer.cancel()

不过,请注意。这使用中断来停止你给它的任何功能。这对于所有功能来说可能不是一个好主意,它还会阻止您在超时期间使用 Ctrl + C 关闭程序(即 Ctrl + C 将被视为超时)。

您可以使用它并将其称为:

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = run_with_timeout(timeout, None, scan_process.stdout.readline)
    if line is None:
        break
    else:
        some_criterium = do_something(line)

不过,这可能有点矫枉过正。我怀疑对于您的情况有一个我不知道的更简单的选择。

【讨论】:

不必为每一行创建一个新线程:a single watchdog thread is enough 像魅力一样工作,应该被选为最佳:-) 谢谢@Flogo! 将前两行放在try-block中不是更好吗,即“timeout_timer = Timer( ....upto.... timer.start()”在try-except之外? @AshKetchum:timeout_timer.start() 行应该在 try 块中。想象一下,你有一个非常短的时间限制,并且在启动线程之后和进入 try-block 之前有一个上下文切换。从理论上讲,这可能会导致将KeyboardInterrupt 发送到主线程。我猜,初始化 Timer 的行可能在外面。 似乎不适用于 Ubuntu 18.04、python 3.6.9。尽管_thread.interrupt_main() 被执行,scan_process.stdout.readline() 不能被中断。【参考方案4】:

虽然Tom's solution 有效,但在C 成语中使用select() 更紧凑,这相当于您的答案:

from select import select
scan_process = subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                bufsize=1)  # Line buffered
while some_criterium and not time_limit:
    poll_result = select([scan_process.stdout], [], [], time_limit)[0]

其余的都一样。

pydoc select.select

[注意:这是 Unix 特有的,其他一些答案也是如此。]

[注2:根据OP请求编辑添加行缓冲]

[注3:行缓冲可能并非在所有情况下都可靠,导致readline()阻塞]

【讨论】:

注意:这以及@Tom 的答案在 Windows 上不起作用,如果收到任何输入,它会重置超时。 OP 仅在收到换行符时才希望重置超时(尽管很容易满足此要求)。 另外,为了避免像@Tom 的回答那样阻塞.readline(),在select 之后使用os.read(scan_process.stdout.fileno(), 512)(如果其他东西可以访问管道,也不是100% 安全)但在select 之后被阻止的可能性比.readline() 更小。 我认为整个想法是阻塞直到读取一行或达到超时?...抱歉,如果我误解了。 想一想:如果您的代码在 readline() 上被阻止,那么您希望如何遵守超时 你不知道孩子的标准输出是否是行缓冲的(bufsize=1 对子进程没有影响;它只调节父进程中用于读取输出的缓冲区),通常是标准输出如果它被重定向到管道,则被块缓冲,即select() 可能会在没有完整行可用的情况下返回。【参考方案5】:

一个可移植的解决方案是,如果读取一行需要太长时间,则使用线程杀死子进程:

#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT

timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
           universal_newlines=True) as process:  # text mode
    # kill process in timeout seconds unless the timer is restarted
    watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
    watchdog.start()
    for line in process.stdout:
        # don't invoke the watcthdog callback if do_something() takes too long
        with watchdog.blocked:
            if not do_something(line):  # some criterium is not satisfied
                process.kill()
                break
            watchdog.restart()  # restart timer just before reading the next line
    watchdog.cancel()

其中WatchdogTimer 类类似于threading.Timer,可以重新启动和/或阻止:

from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic  # use time.time or monotonic.monotonic on Python 2

class WatchdogTimer(Thread):
    """Run *callback* in *timeout* seconds unless the timer is restarted."""

    def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
        super().__init__(**kwargs)
        self.timeout = timeout
        self.callback = callback
        self.args = args
        self.timer = timer
        self.cancelled = Event()
        self.blocked = Lock()

    def run(self):
        self.restart() # don't start timer until `.start()` is called
        # wait until timeout happens or the timer is canceled
        while not self.cancelled.wait(self.deadline - self.timer()):
            # don't test the timeout while something else holds the lock
            # allow the timer to be restarted while blocked
            with self.blocked:
                if self.deadline <= self.timer() and not self.cancelled.is_set():
                    return self.callback(*self.args)  # on timeout

    def restart(self):
        """Restart the watchdog timer."""
        self.deadline = self.timer() + self.timeout

    def cancel(self):
        self.cancelled.set()

【讨论】:

【参考方案6】:

尝试使用 signal.alarm:

#timeout.py
import signal, sys

def timeout(sig, frm):
  print "This is taking too long..."
  sys.exit(1)

signal.signal(signal.SIGALRM, timeout)
signal.alarm(10)
byte = 0

while 'IT' not in open('/dev/urandom').read(2):
  byte += 2
print "I got IT in %s byte(s)!" % byte

运行几次即可证明它有效:

$ python timeout.py 
This is taking too long...
$ python timeout.py 
I got IT in 4672 byte(s)!

有关更详细的示例,请参阅pGuides。

【讨论】:

这是 Unix-only,不能在 Windows 上工作,因为 SIGALRM 和 signal.alarm 不可用。

以上是关于Python中子进程读取线超时的主要内容,如果未能解决你的问题,请参考以下文章

Python中子进程中的列表索引超出范围

重定向Go中子进程的stdout管道

如果在使用管道时子进程的数量大于处理器,进程会被阻塞吗?

在 C 中使用 execve 加载程序时子进程如何终止

E: 安装google or-tools时子进程/usr/bin/dpkg返回错误码(1)

C linux中子进程与父进程之间的通信:父进程不阻塞