Subprocess.Popen:将 stdout 和 stderr 克隆到终端和变量

Posted

技术标签:

【中文标题】Subprocess.Popen:将 stdout 和 stderr 克隆到终端和变量【英文标题】:Subprocess.Popen: cloning stdout and stderr both to terminal and variables 【发布时间】:2013-06-15 22:15:35 【问题描述】:

是否可以修改下面的代码以从 'stdout' 和 'stderr' 打印输出:

终端上打印(实时), 最后存储在 outserrs 变量中?

代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import subprocess

def run_cmd(command, cwd=None):
    p = subprocess.Popen(command, cwd=cwd, shell=False,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outs, errs = p.communicate()
    rc = p.returncode
    outs = outs.decode('utf-8')
    errs = errs.decode('utf-8')

    return (rc, (outs, errs))

感谢@unutbu,特别感谢@j-f-sebastian,最终功能:

#!/usr/bin/python3
# -*- coding: utf-8 -*-


import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()


def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)


def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None

    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
        )

    if passthrough:

        outs, errs = [], []

        q = Queue()

        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
            )

        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
            )

        writer_thread = Thread(
            target=write_output, args=(q.get,)
            )

        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()

        proc.wait()

        for t in (stdout_thread, stderr_thread):
            t.join()

        q.put(None)

        outs = ' '.join(outs)
        errs = ' '.join(errs)

    else:

        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')

    rc = proc.returncode

    return (rc, (outs, errs))

【问题讨论】:

代码示例确实存储了outserrs 并返回它们...要打印到终端,只需if outs: print outs if errs: print errs @bnlucas 谢谢,但正如我在第一点所说:输出应该实时打印到终端,就像没有管道一样。 如果你需要 Python 3 代码;添加python-3.x 标签(我在shebang中看到python3)。您编写的代码将使阅读线程挂起。在 Python 3 中,'' 是一个 Unicode 文字,但 pipe.readline() 默认返回字节('' != b"" 在 Python 3 上)。如果你修复它,那么编写器线程将不会结束,因为没有任何东西将 "" 放入队列中。 相关:Displaying subprocess output to stdout and redirecting it 【参考方案1】:

要在单个线程中逐行从子进程中同时捕获和显示 stdout 和 stderr,您可以使用异步 I/O:

#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE

@asyncio.coroutine
def read_stream_and_display(stream, display):
    """Read from stream line by line until EOF, display, and capture the lines.

    """
    output = []
    while True:
        line = yield from stream.readline()
        if not line:
            break
        output.append(line)
        display(line) # assume it doesn't block
    return b''.join(output)

@asyncio.coroutine
def read_and_display(*cmd):
    """Capture cmd's stdout, stderr while displaying them as they arrive
    (line by line).

    """
    # start process
    process = yield from asyncio.create_subprocess_exec(*cmd,
            stdout=PIPE, stderr=PIPE)

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = yield from asyncio.gather(
            read_stream_and_display(process.stdout, sys.stdout.buffer.write),
            read_stream_and_display(process.stderr, sys.stderr.buffer.write))
    except Exception:
        process.kill()
        raise
    finally:
        # wait for the process to exit
        rc = yield from process.wait()
    return rc, stdout, stderr

# run the event loop
if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()

【讨论】:

这段代码看起来不错,你能添加一个 Python 2.7 的版本吗? @kinORnirvana: asyncio 仅适用于 Python 3.3+ 有 trollius——一个 Python 2 克隆,但 it is deprecated 干得好,J.F!我只是“借用”了this answer 的代码。如果您有任何 cmets、建议和/或更好的答案,我们将不胜感激。 请注意,一旦循环关闭,执行get_event_loop 将获得相同的关闭循环,无法按原样重复使用(event loop is closed 消息)。我最终做了asyncio.set_event_loop(asyncio.new_event_loop()) 来获得一个新的事件循环。 我在 Jupyter 笔记本中运行此代码。我得到了一个AttributeError,因为sys.stdout.buffer 不再存在。这有助于清除它:docs.python.org/3/library/sys.html#sys.stderr 在 Jupyter 笔记本中,我使用 sys.stdout.write 代替 sys.stdout.buffer.write,输出出现在笔记本日志输出中。【参考方案2】:

您可以生成线程来读取 stdout 和 stderr 管道、写入公共队列以及追加到列表。然后使用第三个线程打印队列中的项目。

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE


def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()

def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

使用第三个线程的原因——而不是让前两个线程都直接打印到终端——是为了防止两个打印语句同时发生,这有时会导致文本乱码。


上面调用random_print.py,随机打印到stdout和stderr:

import sys
import time
import random

for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'\n')
    f.flush()
    time.sleep(0.1)

这个解决方案借鉴了J. F. Sebastian, here的代码和想法。


这是类 Unix 系统的替代解决方案,使用 select.select

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
    # https://***.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://***.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break

fileno = 'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

此解决方案使用来自Adam Rosenfield's post, here 的代码和想法。

【讨论】:

您可以在process.wait() 之后添加q.put(None) 并在None 上退出第三个线程,例如for line in iter(get, None):pipe.close() 也不见了。 @J.F.Sebastian:感谢您的更正。假设read_output 出于某种原因无法跟上写入pipe 的输出。 (我尝试用上面的time.sleep(1) 来模拟它)。当 time.sleep(1) 未注释时,outerr 无法在 process.wait() 完成之前收集所有输出。你知道保证outerr 得到所有输出的方法吗? terr,out.join()put(None) 之前。顺便说一句,要“实时”获取行,bufsize=1 可能会有所帮助(忽略`块缓冲问题)

以上是关于Subprocess.Popen:将 stdout 和 stderr 克隆到终端和变量的主要内容,如果未能解决你的问题,请参考以下文章

实时 subprocess.Popen 通过 stdout 和 PIPE

管道输出subprocess.Popen到文件

如何从 subprocess.Popen() 获取输出。 proc.stdout.readline() 块,没有数据打印出来

python subprocess.popen stdin.write

子进程冻结 popen().stdout.read

subprocess.Popen与CLI提示交互