Python中子进程读取线超时
Posted
技术标签:
【中文标题】Python中子进程读取线超时【英文标题】:Timeout on subprocess readline in Python 【发布时间】:2012-06-01 03:39:21 【问题描述】:我有一个小问题,我不太确定如何解决。这是一个最小的例子:
我有什么
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
line = scan_process.stdout.readline()
some_criterium = do_something(line)
我想要什么
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
line = scan_process.stdout.readline()
if nothing_happens_after_10s:
break
else:
some_criterium = do_something(line)
我从子进程中读取了一行并对其进行了处理。如果在固定时间间隔后没有线路到达,我该如何退出?
【问题讨论】:
相关:Non-blocking read on a subprocess.PIPE in python 相关:Stop reading process output in Python without hang? 耳语标准 @SteveCarter 是的,措辞可以改进。我很乐意接受相应的修改。 【参考方案1】:感谢大家的回答!
我找到了一种方法来解决我的问题,只需使用 select.poll 来查看标准输出。
import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
poll_result = poll_obj.poll(0)
if poll_result:
line = scan_process.stdout.readline()
some_criterium = do_something(line)
update(time_limit)
【讨论】:
虽然这似乎可行,但它并不可靠——考虑您的子进程是否在没有新行的情况下输出某些内容。select
/poll
会触发,但readline
会无限期阻塞。
可能不适用于 Windows,select.poll()
仅适用于套接字。 docs.python.org/2/library/select.html
我没有在 Windows 中测试过这个解决方案,所以你可能是对的,我知道它可以在 OSX 和 Linux 下运行。
@gentimouton: asyncio
can read subprocess' output asynchroniously in a portable manner
@DimaTisnek,所以如果根本没有回行,程序还是会被readline永远阻塞?【参考方案2】:
这是一个便携式解决方案,它使用asyncio
强制读取单行超时:
#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
async def run_command(*args, timeout=None):
# Start child process
# NOTE: universal_newlines parameter is not supported
process = await asyncio.create_subprocess_exec(*args,
stdout=PIPE, stderr=STDOUT)
# Read line (sequence of bytes ending with b'\n') asynchronously
while True:
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout)
except asyncio.TimeoutError:
pass
else:
if not line: # EOF
break
elif do_something(line):
continue # While some criterium is satisfied
process.kill() # Timeout or some criterion is not satisfied
break
return await process.wait() # Wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
timeout=10))
loop.close()
【讨论】:
这太棒了,伟大的工作!对于可能有不止一条预期线路的其他人,我建议使用process.stdout.read()
而不是 readline()
。
@jftuga: .read()
在这里不正确。问题是关于.readline()
。如果您需要所有输出,那么使用带有超时的.communicate()
会更简单。阅读my comment under the answer that uses .communicate()
。
@JanKaifer 是的。指向 Python 3 文档的链接和显式的 shebang #!... python3
都指向 Python 3。当前的 Python 版本是 3.6。答案中的语法是 Python 3.5(2015 年发布)。
如果您可以将所做的一切切换到asyncio
,那就太好了。想要使用queue.Queue
与任何东西进行交互?很难,这打破了asyncio
。有要注册回调的非asyncio
库吗?艰难的。 asyncio
与其他任何事物都不能很好地互动,而且似乎总是比它的价值更麻烦。
@Tom:除非不是很明显,否则您可以在 asyncio 代码中与不使用 asyncio
的代码进行交互,例如,asyncio.to_thread,是的,处理异步与异步代码。阻塞分隔(具有彩色功能)是一个普遍的问题journal.stuffwithstuff.com/2015/02/01/…【参考方案3】:
我在 Python 中使用了一些更通用的东西(如果我没记错的话,也是从 *** 问题拼凑而成的,但我不记得是哪些问题了)。
import thread
from threading import Timer
def run_with_timeout(timeout, default, f, *args, **kwargs):
if not timeout:
return f(*args, **kwargs)
try:
timeout_timer = Timer(timeout, thread.interrupt_main)
timeout_timer.start()
result = f(*args, **kwargs)
return result
except KeyboardInterrupt:
return default
finally:
timeout_timer.cancel()
不过,请注意。这使用中断来停止你给它的任何功能。这对于所有功能来说可能不是一个好主意,它还会阻止您在超时期间使用 Ctrl + C 关闭程序(即 Ctrl + C 将被视为超时)。
您可以使用它并将其称为:
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
line = run_with_timeout(timeout, None, scan_process.stdout.readline)
if line is None:
break
else:
some_criterium = do_something(line)
不过,这可能有点矫枉过正。我怀疑对于您的情况有一个我不知道的更简单的选择。
【讨论】:
不必为每一行创建一个新线程:a single watchdog thread is enough 像魅力一样工作,应该被选为最佳:-) 谢谢@Flogo! 将前两行放在try-block中不是更好吗,即“timeout_timer = Timer( ....upto.... timer.start()”在try-except之外? @AshKetchum:timeout_timer.start()
行应该在 try 块中。想象一下,你有一个非常短的时间限制,并且在启动线程之后和进入 try-block 之前有一个上下文切换。从理论上讲,这可能会导致将KeyboardInterrupt
发送到主线程。我猜,初始化 Timer 的行可能在外面。
似乎不适用于 Ubuntu 18.04、python 3.6.9。尽管_thread.interrupt_main()
被执行,scan_process.stdout.readline()
不能被中断。【参考方案4】:
虽然Tom's solution 有效,但在C 成语中使用select()
更紧凑,这相当于您的答案:
from select import select
scan_process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1) # Line buffered
while some_criterium and not time_limit:
poll_result = select([scan_process.stdout], [], [], time_limit)[0]
其余的都一样。
见pydoc select.select
。
[注意:这是 Unix 特有的,其他一些答案也是如此。]
[注2:根据OP请求编辑添加行缓冲]
[注3:行缓冲可能并非在所有情况下都可靠,导致readline()阻塞]
【讨论】:
注意:这以及@Tom 的答案在 Windows 上不起作用,如果收到任何输入,它会重置超时。 OP 仅在收到换行符时才希望重置超时(尽管很容易满足此要求)。 另外,为了避免像@Tom 的回答那样阻塞.readline()
,在select
之后使用os.read(scan_process.stdout.fileno(), 512)
(如果其他东西可以访问管道,也不是100% 安全)但在select
之后被阻止的可能性比.readline()
更小。
我认为整个想法是阻塞直到读取一行或达到超时?...抱歉,如果我误解了。
想一想:如果您的代码在 readline()
上被阻止,那么您希望如何遵守超时
你不知道孩子的标准输出是否是行缓冲的(bufsize=1
对子进程没有影响;它只调节父进程中用于读取输出的缓冲区),通常是标准输出如果它被重定向到管道,则被块缓冲,即select()
可能会在没有完整行可用的情况下返回。【参考方案5】:
一个可移植的解决方案是,如果读取一行需要太长时间,则使用线程杀死子进程:
#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT
timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
universal_newlines=True) as process: # text mode
# kill process in timeout seconds unless the timer is restarted
watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
watchdog.start()
for line in process.stdout:
# don't invoke the watcthdog callback if do_something() takes too long
with watchdog.blocked:
if not do_something(line): # some criterium is not satisfied
process.kill()
break
watchdog.restart() # restart timer just before reading the next line
watchdog.cancel()
其中WatchdogTimer
类类似于threading.Timer
,可以重新启动和/或阻止:
from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic # use time.time or monotonic.monotonic on Python 2
class WatchdogTimer(Thread):
"""Run *callback* in *timeout* seconds unless the timer is restarted."""
def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
super().__init__(**kwargs)
self.timeout = timeout
self.callback = callback
self.args = args
self.timer = timer
self.cancelled = Event()
self.blocked = Lock()
def run(self):
self.restart() # don't start timer until `.start()` is called
# wait until timeout happens or the timer is canceled
while not self.cancelled.wait(self.deadline - self.timer()):
# don't test the timeout while something else holds the lock
# allow the timer to be restarted while blocked
with self.blocked:
if self.deadline <= self.timer() and not self.cancelled.is_set():
return self.callback(*self.args) # on timeout
def restart(self):
"""Restart the watchdog timer."""
self.deadline = self.timer() + self.timeout
def cancel(self):
self.cancelled.set()
【讨论】:
【参考方案6】:尝试使用 signal.alarm:
#timeout.py
import signal, sys
def timeout(sig, frm):
print "This is taking too long..."
sys.exit(1)
signal.signal(signal.SIGALRM, timeout)
signal.alarm(10)
byte = 0
while 'IT' not in open('/dev/urandom').read(2):
byte += 2
print "I got IT in %s byte(s)!" % byte
运行几次即可证明它有效:
$ python timeout.py
This is taking too long...
$ python timeout.py
I got IT in 4672 byte(s)!
有关更详细的示例,请参阅pGuides。
【讨论】:
这是 Unix-only,不能在 Windows 上工作,因为 SIGALRM 和 signal.alarm 不可用。以上是关于Python中子进程读取线超时的主要内容,如果未能解决你的问题,请参考以下文章