使用子进程获取实时输出
Posted
技术标签:
【中文标题】使用子进程获取实时输出【英文标题】:Getting realtime output using subprocess 【发布时间】:2010-10-22 15:24:19 【问题描述】:我正在尝试为命令行程序 (svnadmin verify) 编写一个包装脚本,该脚本将为操作显示一个不错的进度指示器。这要求我能够在输出后立即看到包装程序的每一行输出。
我想我只需使用subprocess.Popen
执行程序,使用stdout=PIPE
,然后读取每一行,并采取相应的行动。但是,当我运行以下代码时,输出似乎在某处缓冲,导致它出现在两个块中,第 1 行到第 332 行,然后是第 333 到 439 行(输出的最后一行)
from subprocess import Popen, PIPE, STDOUT
p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE,
stderr = STDOUT, shell = True)
for line in p.stdout:
print line.replace('\n', '')
看了一些关于子进程的文档后,我发现bufsize
参数为Popen
,所以我尝试将 bufsize 设置为 1(每行缓冲)和 0(无缓冲),但两个值似乎都没有改变线路的交付方式。
此时我开始抓住稻草,所以我编写了以下输出循环:
while True:
try:
print p.stdout.next().replace('\n', '')
except StopIteration:
break
但得到了相同的结果。
是否可以获得使用子进程执行的程序的“实时”程序输出? Python 中是否还有其他向前兼容的选项(不是exec*
)?
【问题讨论】:
您是否尝试过省略sydout=PIPE
,以便子进程绕过父进程直接写入您的控制台?
问题是我想读取输出。如果它直接输出到控制台,我该怎么做?另外,我不希望用户看到包装程序的输出,只看到我的输出。
那为什么要“实时”显示呢?我不明白用例。
不要使用 shell=True。它不必要地调用你的 shell。使用 p = Popen(['svnadmin', 'verify', '/var/svn/repos/config'], stdout=PIPE, stderr=STDOUT) 代替
@S.Lott 基本上,svnadmin verify 会为每个经过验证的修订打印一行输出。我想制作一个不会导致过多输出的不错的进度指示器。有点像 wget,例如
【参考方案1】:
你可以试试这个:
import subprocess
import sys
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
while True:
out = process.stdout.read(1)
if out == '' and process.poll() != None:
break
if out != '':
sys.stdout.write(out)
sys.stdout.flush()
如果使用 readline 代替 read,会出现输入消息不打印的情况。尝试使用需要内联输入的命令并亲自查看。
【讨论】:
是的,使用 readline() 将停止打印(即使调用 sys.stdout.flush()) 这应该无限期挂起吗?我希望给定的解决方案还包括用于在初始子流程完成时编辑循环的样板代码。抱歉,无论我研究多少次,子流程等都是我无法开始工作的。 为什么要在 Python 中测试 '' 时我们可以使用,如果没有出来? 这是长时间运行作业的最佳解决方案。但它应该使用 is not None 而不是 != None。您不应将 != 与 None 一起使用。 stderr是否也由此显示?【参考方案2】:不久前我遇到了同样的问题。我的解决方案是放弃对 read
方法的迭代,即使您的子进程没有完成执行等,它也会立即返回。
【讨论】:
【参考方案3】:我试过这个,由于某种原因,代码中
for line in p.stdout:
...
积极缓冲,变体
while True:
line = p.stdout.readline()
if not line: break
...
没有。显然这是一个已知的错误:http://bugs.python.org/issue3907(该问题现已“关闭”,截至 2018 年 8 月 29 日)
【讨论】:
这不是旧 Python IO 实现中唯一的混乱。这就是为什么 Py2.6 和 Py3k 最终有了一个全新的 IO 库。 如果子进程返回一个空行,此代码将中断。更好的解决方案是使用while p.poll() is None
而不是while True
,并删除if not line
@exhuma:它工作正常。 readline 在一个空行上返回“\n”,它不会评估为真。它只在管道关闭时返回一个空字符串,这将是子进程终止的时候。
@Dave 供将来参考:在 py2+ 中使用 print(line.decode('utf-8').rstrip())
打印 utf-8 行。
另外,为了实时读取进程的输出,您需要告诉 python 您不需要任何缓冲。亲爱的 Python 直接给我输出。方法如下:您需要设置环境变量 PYTHONUNBUFFERED=1
。这对于无限的输出特别有用【参考方案4】:
将pexpect 与非阻塞读取行一起使用将解决此问题。它源于管道被缓冲的事实,因此您的应用程序的输出被管道缓冲,因此在缓冲区填满或进程终止之前您无法获得该输出。
【讨论】:
【参考方案5】:通过将缓冲区大小设置为 1,您实际上强制进程不缓冲输出。
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
print line,
p.stdout.close()
p.wait()
【讨论】:
@nbro 可能是因为p.stdout.close()
不清楚。
@nbro 可能是因为给出的代码没有解释...:/
这个 b'' 是关于什么的?
@ManuelSchneid3r iter(<callable>, <string>)
使用 sentinel
)。如果您尝试多次运行p.stdout.readline
,您会看到当它没有其他内容可打印时,它会打印b''
,因此这是在这种情况下使用的合适标记。完整的解决方案:
import contextlib
import subprocess
# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
stream = getattr(proc, stream)
with contextlib.closing(stream):
while True:
out = []
last = stream.read(1)
# Don't loop forever
if last == '' and proc.poll() is not None:
break
while last not in newlines:
# Don't loop forever
if last == '' and proc.poll() is not None:
break
out.append(last)
last = stream.read(1)
out = ''.join(out)
yield out
def example():
cmd = ['ls', '-l', '/']
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
# Make all end-of-lines '\n'
universal_newlines=True,
)
for line in unbuffered(proc):
print line
example()
【讨论】:
由于您在Popen()
调用中使用了universal_newlines=True
,因此您可能也不需要自己处理它们——这就是选项的全部意义所在。
似乎没必要复杂。它不能解决缓冲问题。见links in my answer。
这是我可以实时获得 rsync 进度输出的唯一方法(--outbuf=L)!谢谢【参考方案7】:
我使用这个解决方案来获取子进程的实时输出。此循环将在进程完成后立即停止,无需使用 break 语句或可能的无限循环。
sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while sub_process.poll() is None:
out = sub_process.stdout.read(1)
sys.stdout.write(out)
sys.stdout.flush()
【讨论】:
这是否有可能在标准输出缓冲区不为空的情况下退出循环? 我找了很多合适的答案,但在完成后没有挂起!我通过在out = sub_process...
之后添加if out=='': break
发现这是一个解决方案【参考方案8】:
实时输出问题已解决:
在捕获 C 程序的实时输出时,我在 Python 中遇到了类似的问题。我在我的 C 代码中添加了fflush(stdout);
。它对我有用。这是代码。
C 程序:
#include <stdio.h>
void main()
int count = 1;
while (1)
printf(" Count %d\n", count++);
fflush(stdout);
sleep(1);
Python 程序:
#!/usr/bin/python
import os, sys
import subprocess
procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
while procExe.poll() is None:
line = procExe.stdout.readline()
print("Print:" + line)
输出:
Print: Count 1
Print: Count 2
Print: Count 3
【讨论】:
这是唯一真正有用的东西。我在 C++ 中使用了相同的代码 (flush(stdout)
)。谢谢!
我在 python 脚本调用另一个 python 脚本作为子进程时遇到了同样的问题。在子进程打印中,“flush”是必要的(python 3 中的 print("hello", flush=True))。还有,那边很多例子还是(2020)python 2,这是python 3,所以+1
对于python3+,将line = procExe.stdout.readline()
改为line = procExe.stderr.readline()
【参考方案9】:
发现这个“即插即用”功能here。像魅力一样工作!
import subprocess
def myrun(cmd):
"""from
http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print line,
if line == '' and p.poll() != None:
break
return ''.join(stdout)
【讨论】:
stderr=subprocess.STDOUT
的添加实际上有助于捕获流数据。我赞成它。
这里的主要牛肉似乎来自accepted answer【参考方案10】:
您可以对子进程输出中的每个字节使用迭代器。这允许来自子进程的内联更新(以 '\r' 结尾的行覆盖先前的输出行):
from subprocess import PIPE, Popen
command = ["my_command", "-my_arg"]
# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)
# read each byte of subprocess
while subprocess.poll() is None:
for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else , b''):
c = c.decode('ascii')
sys.stdout.write(c)
sys.stdout.flush()
if subprocess.returncode != 0:
raise Exception("The subprocess did not terminate correctly.")
【讨论】:
【参考方案11】:这是我一直使用的基本骨架。它使实现超时变得容易,并且能够处理不可避免的挂起过程。
import subprocess
import threading
import Queue
def t_read_stdout(process, queue):
"""Read from stdout"""
for output in iter(process.stdout.readline, b''):
queue.put(output)
return
process = subprocess.Popen(['dir'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
cwd='C:\\',
shell=True)
queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()
while process.poll() is None or not queue.empty():
try:
output = queue.get(timeout=.5)
except Queue.Empty:
continue
if not output:
continue
print(output),
t_stdout.join()
【讨论】:
【参考方案12】:您可以直接将子流程输出定向到流。简化示例:
subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)
【讨论】:
这是否允许您在事后获得.communicate()
中的内容?还是内容丢失到父 stderr/stdout 流中?
不,在返回的 CompletedProcess
上没有 communicate()
方法。此外,capture_output
与 stdout
和 stderr
互斥。
这不是“实时”,这是这个问题的重点。这会等到ls
完成运行,并且不允许您访问其输出。 (另外,stdout
和 stderr
关键字参数是多余的 - 您只是明确指定默认值。)【参考方案13】:
根据用例,您可能还希望禁用子进程本身的缓冲。
如果子进程是 Python 进程,您可以在调用之前执行此操作:
os.environ["PYTHONUNBUFFERED"] = "1"
或者在env
参数中将其传递给Popen
。
否则,如果您使用的是 Linux/Unix,则可以使用 stdbuf
工具。例如。喜欢:
cmd = ["stdbuf", "-oL"] + cmd
另请参阅here 关于stdbuf
或其他选项。
(请参阅here 以获得相同的答案。)
【讨论】:
【参考方案14】:Kevin McCarthy 的 Streaming subprocess stdin and stdout with asyncio in Python 博客文章展示了如何使用 asyncio:
import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec
async def _read_stream(stream, callback):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def run(command):
process = await create_subprocess_exec(
*command, stdout=PIPE, stderr=PIPE
)
await asyncio.wait(
[
_read_stream(
process.stdout,
lambda x: print(
"STDOUT: ".format(x.decode("UTF8"))
),
),
_read_stream(
process.stderr,
lambda x: print(
"STDERR: ".format(x.decode("UTF8"))
),
),
]
)
await process.wait()
async def main():
await run("docker build -t my-docker-image:latest .")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
【讨论】:
嗨@Jeef 你能指出修复方法,以便我更新答案吗? 嗨,这对我有用,但我必须添加以下内容以消除一些错误消息:import nest_asyncio; nest_asyncio.apply()
并使用 shell 命令,即process = await create_subprocess_shell(*command, stdout=PIPE, stderr=PIPE, shell=True)
而不是 process = await create_subprocess_exec(...)
。干杯!【参考方案15】:
(此解决方案已使用 Python 2.7.15 测试) 你只需要在每行读/写后 sys.stdout.flush() :
while proc.poll() is None:
line = proc.stdout.readline()
sys.stdout.write(line)
# or print(line.strip()), you still need to force the flush.
sys.stdout.flush()
【讨论】:
【参考方案16】:在 Python 3.x 中,进程可能会挂起,因为输出是字节数组而不是字符串。确保将其解码为字符串。
从 Python 3.6 开始,您可以使用 Popen Constructor 中的参数 encoding
来实现。完整的例子:
process = subprocess.Popen(
'my_command',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
encoding='utf-8',
errors='replace'
)
while True:
realtime_output = process.stdout.readline()
if realtime_output == '' and process.poll() is not None:
break
if realtime_output:
print(realtime_output.strip(), flush=True)
请注意,此代码 redirects stderr
到 stdout
和 handles output errors。
【讨论】:
process.poll() is not None
是什么意思?
如果 realtime_output == '' 为什么要测试?【参考方案17】:
建议 python 3.x 或 pthon 2.x 的答案很少,下面的代码对两者都适用。
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
stdout = []
while True:
line = p.stdout.readline()
if not isinstance(line, (str)):
line = line.decode('utf-8')
stdout.append(line)
print (line)
if (line == '' and p.poll() != None):
break
【讨论】:
【参考方案18】:如果您只想将日志实时转发到控制台
以下代码适用于两者
p = subprocess.Popen(cmd,
shell=True,
cwd=work_dir,
bufsize=1,
stdin=subprocess.PIPE,
stderr=sys.stderr,
stdout=sys.stdout)
【讨论】:
这是不必要的并发症的邪恶组合。如果您不想更改它们的发送位置,请不要为stderr
和stdout
指定任何内容。 cwd=work_dir
和 shell=True
在这里似乎格格不入,bufsize=1
似乎有点可疑,尤其是没有任何解释。【参考方案19】:
def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
rc = process.poll()
return rc
【讨论】:
以上是关于使用子进程获取实时输出的主要内容,如果未能解决你的问题,请参考以下文章