Python分别从子进程stdout和stderr读取,同时保留顺序

Posted

技术标签:

【中文标题】Python分别从子进程stdout和stderr读取,同时保留顺序【英文标题】:Python read from subprocess stdout and stderr separately while preserving order 【发布时间】:2015-10-28 07:28:03 【问题描述】:

我有一个 python 子进程,我正在尝试从中读取输出和错误流。目前我可以正常工作,但我只能在阅读完 stdout 之后才能阅读 stderr。这是它的样子:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_iterator = iter(process.stdout.readline, b"")
stderr_iterator = iter(process.stderr.readline, b"")

for line in stdout_iterator:
    # Do stuff with line
    print line

for line in stderr_iterator:
    # Do stuff with line
    print line

如您所见,stderr for 循环在 stdout 循环完成之前无法启动。我怎样才能修改它以便能够以正确的行进来的顺序从两者中读取?

澄清一下:我仍然需要能够判断一行是来自stdout 还是stderr,因为它们在我的代码中会被区别对待。

【问题讨论】:

相关:Run command and get its stdout, stderr separately in near real time like in a terminal 【参考方案1】:

如果子进程在 stderr 上产生足够的输出(在我的 Linux 机器上约为 100KB),您问题中的代码可能会死锁。

有一个communicate() 方法允许分别从 stdout 和 stderr 读取:

from subprocess import Popen, PIPE

process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()

如果您需要在子进程仍在运行时读取流,那么可移植的解决方案是使用线程(未测试):

from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue # Python 2

def reader(pipe, queue):
    try:
        with pipe:
            for line in iter(pipe.readline, b''):
                queue.put((pipe, line))
    finally:
        queue.put(None)

process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
for _ in range(2):
    for source, line in iter(q.get, None):
        print "%s: %s" % (source, line),

见:

Python: read streaming input from subprocess.communicate() Non-blocking read on a subprocess.PIPE in python Python subprocess get children's output to file and terminal?

【讨论】:

不幸的是,这个答案没有保留来自stdoutstderr 的行的顺序。它非常接近我需要的东西!知道stderr 行相对于stdout 行何时通过管道传输对我来说很重要。 @LukeSapan:我看不出有任何方法可以保留顺序 以分别捕获 stdout/stderr。您可以轻松获得其中之一。在 Unix 上,您可以尝试一个可以使效果不那么明显的选择循环。它开始看起来像XY problem:编辑你的问题并提供一些关于你想要做什么的背景。 @LukeSapan 由于两个 FD 相互独立,因此通过其中一个的消息可能会延迟,因此在这种情况下没有“之前”和“之后”的概念...... @LukeSapan 为什么要保留订单?只需添加时间戳并在最后进行排序。 @MoTSCHIGGE:是的,有,例如:Non-blocking read on a subprocess.PIPE in python【参考方案2】:

进程将数据写入不同管道的顺序在写入后丢失。

您无法判断 stdout 是否在 stderr 之前编写。

您可以尝试以非阻塞方式同时从多个文件描述符中读取数据 只要数据可用,但这只会最大限度地降低订单错误的可能性。

这个程序应该证明这一点:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import select
import subprocess

testapps=
    'slow': '''
import os
import time
os.write(1, 'aaa')
time.sleep(0.01)
os.write(2, 'bbb')
time.sleep(0.01)
os.write(1, 'ccc')
''',
    'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
    'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''


def readfds(fds, maxread):
    while True:
        fdsin, _, _ = select.select(fds,[],[])
        for fd in fdsin:
            s = os.read(fd, maxread)
            if len(s) == 0:
                fds.remove(fd)
                continue
            yield fd, s
        if fds == []:
            break

def readfromapp(app, rounds=10, maxread=1024):
    f=open('testapp.py', 'w')
    f.write(testapps[app])
    f.close()

    results=
    for i in range(0, rounds):
        p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE
                                                     , stderr=subprocess.PIPE)
        data=''
        for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread):
            data = data + s
        results[data] = results[data] + 1 if data in results else 1

    print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread)
    results = sorted(results.items(), key=lambda (k,v): k, reverse=False)
    for data, count in results:
        print '%03i x %s' % (count, data)


print
print "=> if output is produced slowly this should work as whished"
print "   and should return: aaabbbccc"
readfromapp('slow',  rounds=100, maxread=1024)

print
print "=> now mostly aaacccbbb is returnd, not as it should be"
readfromapp('fast',  rounds=100, maxread=1024)

print
print "=> you could try to read data one by one, and return"
print "   e.g. a whole line only when LF is read"
print "   (b's should be finished before c's)"
readfromapp('fast',  rounds=100, maxread=1)

print
print "=> but even this won't work ..."
readfromapp('fast2', rounds=100, maxread=1)

并输出如下内容:

=> if output is produced slowly this should work as whished
   and should return: aaabbbccc
running 100 rounds slow with maxread=1024
100 x aaabbbccc

=> now mostly aaacccbbb is returnd, not as it should be
running 100 rounds fast with maxread=1024
006 x aaabbbccc
094 x aaacccbbb

=> you could try to read data one by one, and return
   e.g. a whole line only when LF is read
   (b's should be finished before c's)
running 100 rounds fast with maxread=1
003 x aaabbbccc
003 x aababcbcc
094 x abababccc

=> but even this won't work ...
running 100 rounds fast2 with maxread=1
003 x aaabbbbbbbbbbbbbbbccc
001 x aaacbcbcbbbbbbbbbbbbb
008 x aababcbcbcbbbbbbbbbbb
088 x abababcbcbcbbbbbbbbbb

【讨论】:

不相关:在这里使用if not s: 而不是if len(s) == 0:。使用while fds: 而不是while True: ... if fds == []: break。使用results = collections.defaultdict(int); ...; results[data]+=1 而不是results = ; ...; results[data] = results[data] + 1 if data in results else 1 或使用results = collections.Counter(); ...; results[data]+=1; ...; for data, count in results.most_common(): 你可以使用data = b''.join([s for _, s in readfds(...)]) 您应该关闭管道以避免依赖垃圾收集来释放父进程中的文件描述符,并调用p.wait() 显式地获取子进程。 注:if there are multiple parallel processes then "slow" might not be slow enough to get the desired output【参考方案3】:

我知道这个问题已经很老了,但是这个答案可能会帮助其他偶然发现这个页面的人研究类似情况的解决方案,所以我还是发布了它。

我已经构建了一个简单的python sn-p,它将任意数量的管道合并为一个管道。当然,如上所述,无法保证顺序,但这是我认为您可以在 Python 中获得的最接近的顺序。

它为每个管道生成一个线程,逐行读取它们并将它们放入队列(FIFO)中。主线程循环遍历队列,产生每一行。

import threading, queue
def merge_pipes(**named_pipes):
    r'''
    Merges multiple pipes from subprocess.Popen (maybe other sources as well).
    The keyword argument keys will be used in the output to identify the source
    of the line.

    Example:
    p = subprocess.Popen(['some', 'call'],
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outputs = 'out': log.info, 'err': log.warn
    for name, line in merge_pipes(out=p.stdout, err=p.stderr):
        outputs[name](line)

    This will output stdout to the info logger, and stderr to the warning logger
    '''

    # Constants. Could also be placed outside of the method. I just put them here
    # so the method is fully self-contained
    PIPE_OPENED=1
    PIPE_OUTPUT=2
    PIPE_CLOSED=3

    # Create a queue where the pipes will be read into
    output = queue.Queue()

    # This method is the run body for the threads that are instatiated below
    # This could be easily rewritten to be outside of the merge_pipes method,
    # but to make it fully self-contained I put it here
    def pipe_reader(name, pipe):
        r"""
        reads a single pipe into the queue
        """
        output.put( ( PIPE_OPENED, name, ) )
        try:
            for line in iter(pipe.readline,''):
                output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) )
        finally:
            output.put( ( PIPE_CLOSED, name, ) )

    # Start a reader for each pipe
    for name, pipe in named_pipes.items():
        t=threading.Thread(target=pipe_reader, args=(name, pipe, ))
        t.daemon = True
        t.start()

    # Use a counter to determine how many pipes are left open.
    # If all are closed, we can return
    pipe_count = 0

    # Read the queue in order, blocking if there's no data
    for data in iter(output.get,''):
        code=data[0]
        if code == PIPE_OPENED:
            pipe_count += 1
        elif code == PIPE_CLOSED:
            pipe_count -= 1
        elif code == PIPE_OUTPUT:
            yield data[1:]
        if pipe_count == 0:
            return

【讨论】:

【参考方案4】:

这对我有用(在 Windows 上): https://github.com/waszil/subpiper

from subpiper import subpiper

def my_stdout_callback(line: str):
    print(f'STDOUT: line')

def my_stderr_callback(line: str):
    print(f'STDERR: line')

my_additional_path_list = [r'c:\important_location']

retcode = subpiper(cmd='echo magic',
                   stdout_callback=my_stdout_callback,
                   stderr_callback=my_stderr_callback,
                   add_path_list=my_additional_path_list)

【讨论】:

【参考方案5】:

这是一个基于 selectors 的解决方案,但它保留了顺序,并且流式传输可变长度的字符(甚至是单个字符)。

诀窍是使用read1(),而不是read()

import selectors
import subprocess
import sys

p = subprocess.Popen(
    ["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)

while True:
    for key, _ in sel.select():
        data = key.fileobj.read1().decode()
        if not data:
            exit()
        if key.fileobj is p.stdout:
            print(data, end="")
        else:
            print(data, end="", file=sys.stderr)

如果你想要一个测试程序,使用这个。

import sys
from time import sleep


for i in range(10):
    print(f" xi ", file=sys.stderr, end="")
    sleep(0.1)
    print(f" yi ", end="")
    sleep(0.1)

【讨论】:

看起来这是罪魁祸首 - ***.com/questions/375427/…。选择器不适用于管道的窗口:( 作为一个明显而微不足道的改进,get rid of the shell=True 注意:1- 它在 Windows 2 上不起作用- 它不会保留顺序(它只是让您不太可能注意到顺序错误)。见related comments under my answer 是否有可重现的方法来获取错误的订单?也许是某种模糊测试? @shouldsee,任意大小的 1024 是否适用于 python 3.5?【参考方案6】:

这适用于 Python3 (3.6):

    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
                         stderr=subprocess.PIPE, universal_newlines=True)
    # Read both stdout and stderr simultaneously
    sel = selectors.DefaultSelector()
    sel.register(p.stdout, selectors.EVENT_READ)
    sel.register(p.stderr, selectors.EVENT_READ)
    ok = True
    while ok:
        for key, val1 in sel.select():
            line = key.fileobj.readline()
            if not line:
                ok = False
                break
            if key.fileobj is p.stdout:
                print(f"STDOUT: line", end="")
            else:
                print(f"STDERR: line", end="", file=sys.stderr)

【讨论】:

【参考方案7】:

来自https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module

如果您希望捕获两个流并将其合并为一个,请使用 stdout=PIPE 和 stderr=STDOUT 而不是 capture_output。

所以最简单的解决方案是:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout_iterator = iter(process.stdout.readline, b"")

for line in stdout_iterator:
    # Do stuff with line
    print line

【讨论】:

以上是关于Python分别从子进程stdout和stderr读取,同时保留顺序的主要内容,如果未能解决你的问题,请参考以下文章

合并Python脚本的子进程'stdout和stderr,同时保持它们可区分

Python asyncio子进程连续写入stdin和读取stdout/stderr

python:运行一个超时的进程并捕获stdout、stderr和退出状态[重复]

如何在 Windows 中访问继承的匿名管道句柄,而不是 stdout、stderr 和 stdin?

python 实时子进程stdout / stderr

使用python sh模块,如何保存组合的stdout和stderr?