在 Python 中对 subprocess.PIPE 进行非阻塞读取

Posted

技术标签:

【中文标题】在 Python 中对 subprocess.PIPE 进行非阻塞读取【英文标题】:A non-blocking read on a subprocess.PIPE in Python 【发布时间】:2010-09-27 09:18:06 【问题描述】:

我正在使用subprocess module 启动一个子进程并连接到它的输出流(标准输出)。我希望能够在其标准输出上执行非阻塞读取。在我调用.readline 之前,有没有办法使 .readline 非阻塞或检查流中是否有数据?我希望它是可移植的,或者至少可以在 Windows 和 Linux 下工作。

这是我现在的做法(如果没有可用数据,它会阻止 .readline):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

【问题讨论】:

(来自 google?)当 PIPE 的缓冲区之一被填满且未被读取时,所有 PIPE 都会死锁。例如填充 stderr 时出现 stdout 死锁。切勿通过您不打算阅读的 PIPE。 @NasserAl-Wohaibi 这是否意味着总是创建文件更好? 我一直很想知道为什么它首先被阻止...我问是因为我看到了评论:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read() 这是“按设计”,等待接收输入。 难以置信 12 年这不是 python 本身的一部分:( 【参考方案1】:

fcntlselectasyncproc 在这种情况下无济于事。

不管操作系统如何,一个可靠的读取流的方法是使用Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

【讨论】:

是的,这对我有用,但我删除了很多。它包括良好的做法,但并不总是必要的。 Python 3.x 2.X compat 和 close_fds 可以省略,它仍然可以工作。但是要注意每件事的作用,不要盲目地复制它,即使它只是有效的! (其实最简单的解决方案就是像 Seb 那样使用线程并做 readline,Qeus 只是获取数据的一种简单方法,还有其他方法,线程就是答案!) 在线程内部,对out.readline 的调用阻塞了线程和主线程,我必须等到 readline 返回,然后才能继续进行其他操作。有什么简单的方法吗? (我正在从我的进程中读取多行,这也是另一个正在执行 DB 和事物的 .py 文件) @Justin: 'out.readline' 不会阻塞它在另一个线程中执行的主线程。 close_fds 绝对不是你想盲目复制到应用程序中的东西...... 如果我无法关闭子进程怎么办,例如。由于异常?即使主线程退出,stdout-reader 线程也不会死,python 会挂起,不是吗?如何解决这个问题? python 2.x 不支持杀死线程,更糟糕的是,不支持中断它们。 :((显然应该处理异常以确保子进程被关闭,但以防万一它不会,你能做什么?)【参考方案2】:

我经常遇到类似的问题;我经常编写的 Python 程序需要能够执行一些主要功能,同时接受来自命令行 (stdin) 的用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline() 阻塞并且没有超时。如果主要功能已经完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline() 仍然阻塞在等待一行的另一个线程中。我发现这个问题的一个解决方案是使用 fcntl 模块使 stdin 成为一个非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用 select 或 signal 模块来解决这个问题要干净一些,但它只适用于 UNIX...

【讨论】:

根据文档, fcntl() 可以接收文件描述符或具有 .fileno() 方法的对象。 Jesse's answer 不正确。根据 Guido 的说法,readline 在非阻塞模式下不能正常工作,在 Python 3000 之前不会。bugs.python.org/issue1175#msg56041 如果你想使用 fcntl 将文件设置为非阻塞模式,你必须使用较低的-level os.read() 并自己分离出这些行。将 fcntl 与执行行缓冲的高级调用混合是自找麻烦。 在 Python 2 中 readline 的使用似乎不正确。请参阅 anonnn 的回答 ***.com/questions/375427/… 请不要使用繁忙的循环。使用带有超时的 poll() 来等待数据。 @Stefano buffer_size 的定义是什么?【参考方案3】:

Python 3.4 为异步 IO 引入了新的 provisional API -- asyncio module。

该方法类似于twisted-based answer by @Bryan Ward——定义一个协议,并在数据准备好后立即调用其方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

见"Subprocess" in the docs。

有一个高级接口asyncio.create_subprocess_exec(),它返回Process objects,允许使用StreamReader.readline() coroutine异步读取一行 (async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() 执行以下任务:

启动子进程,将其标准输出重定向到管道 从子进程的标准输出异步读取一行 杀死子进程 等待它退出

如有必要,每个步骤都可以受到超时秒数的限制。

【讨论】:

当我使用 python 3.4 协程尝试这样的事情时,我只有在整个脚本运行后才能得到输出。只要子进程打印一行,我就希望看到一行输出。这就是我得到的:pastebin.com/qPssFGep. @flutefreak7: buffering issues 与当前问题无关。按照链接获取可能的解决方案。 谢谢!通过简单地使用print(text, flush=True) 解决了我的脚本的问题,以便调用readline 的观察者可以立即使用打印的文本。当我用基于 Fortran 的可执行文件测试它时,我实际上想要包装/观察,它不会缓冲它的输出,所以它的行为符合预期。 是否可以让子进程持久化并执行进一步的读/写操作。 readline_and_kill,在您的第二个脚本中,与subprocess.comunicate 的工作方式非常相似,因为它会在一次读/写操作后终止进程。我还看到您使用的是单个管道stdout,它的子进程处理为非阻塞。尝试同时使用 stdoutstderr I find I end up blocking。 @Carel 答案中的代码按照答案中明确描述的方式工作。如果需要,可以实现其他行为。如果使用,两个管道同样是非阻塞的,这是一个示例 how to read from both pipes concurrently。【参考方案4】:

试试asyncproc 模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责按照 S.Lott 的建议处理所有线程。

【讨论】:

绝对精彩。比原始子流程模块容易得多。在 Ubuntu 上非常适合我。 asyncproc 在 windows 上不起作用,windows 不支持 os.WNOHANG :-( asyncproc 是 GPL,这进一步限制了它的使用:-( 谢谢。一件小事:似乎在 asyncproc.py 中用 8 个空格替换制表符是可行的方法:) 您似乎无法通过 asyncproc 模块获取您启动的进程的返回码;只有它生成的输出。【参考方案5】:

您可以在Twisted 中轻松做到这一点。根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道。您创建一个ProcessProtocol 类,并覆盖outReceived() 方法。 Twisted(取决于所使用的反应器)通常只是一个大的select() 循环,其中安装了回调来处理来自不同文件描述符(通常是网络套接字)的数据。所以outReceived() 方法只是安装一个回调来处理来自STDOUT 的数据。演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted documentation 有一些很好的信息。

如果您围绕 Twisted 构建整个应用程序,它会与其他进程(本地或远程)进行异步通信,就像这样非常优雅。另一方面,如果你的程序不是建立在 Twisted 之上的,那么这真的不会有那么大的帮助。希望这对其他读者有所帮助,即使它不适用于您的特定应用程序。

【讨论】:

不好。根据docsselect 不应该在带有文件描述符的窗口上工作 @naxa 我认为他所指的select() 与您不同。我假设这是因为 Twisted 在 Windows 上工作... 我添加了similar solution based on asyncio from stdlib。 “扭曲(取决于所使用的反应器)通常只是一个大的 select() 循环”意味着有几个反应器可供选择。 select() 是在 unix 和类 unix 上最便携的一个,但也有两个可用于 Windows 的反应器:twistedmatrix.com/documents/current/core/howto/…【参考方案6】:

在类 Unix 系统和 Python 3.5+ 上,os.set_blocking 与它所说的完全一样。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

这个输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

os.set_blocking 评论它是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

【讨论】:

这是迄今为止最优雅的解决方案,感谢您让我度过了愉快的一天(实际上是晚上^^) 非常优雅,非常高效。感谢这个解决方案,它工作得很好! 谢谢!这在使用带有SelectorPopen 管道时非常有用,以确保它永远不会阻塞。【参考方案7】:

使用选择和读取(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于 readline() 类:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

【讨论】:

不好。根据docsselect 不应该在带有文件描述符的窗口上工作 天啊。一次读取兆字节,或者可能是千兆字节……这是我很长时间以来看到的最糟糕的想法……不用说,这段代码不起作用,因为proc.stdout.read()无论多么小参数是一个阻塞调用。 OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed【参考方案8】:

一种解决方案是创建另一个进程来执行您对该进程的读取,或者使该进程的线程超时。

这是超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,您需要在标准输出进入时读取它吗? 另一种解决方案可能是将输出转储到文件并使用 p.wait() 等待进程完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

【讨论】:

似乎recpie's 线程在超时后不会退出并且杀死它取决于能够杀死它读取的子进程(sg。否则在这方面无关)​​(你应该能够但是以防万一你不能......)。【参考方案9】:

这是我的代码,用于尽快捕获子流程的每个输出,包括部分行。它同时抽水,stdout 和 stderr 以几乎正确的顺序抽水。

在 Python 2.7 linux 和 windows 上测试并正确运行。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

【讨论】:

少数答案之一,可让您阅读不一定以换行符结尾的内容。 虽然您的解决方案是最接近我没有丢失输入的解决方案,但使用上述代码连续运行数百次类似“cat /some/big/file”的内容,并将每个输出与最后一个输出进行比较一个会显示差异并最终出现一些(罕见的)无法捕获整个输出的时间。 Hmmm.. 不是整个文件——因为开头缺少某些东西(即它在 io.open 完成之前发送数据),或者因为文件末尾的某些东西(在耗尽之前退出所有输入)?【参考方案10】:

免责声明:这只适用于龙卷风

您可以通过将 fd 设置为非阻塞,然后使用 ioloop 注册回调来做到这一点。我已经将它打包在一个名为 tornado_subprocess 的鸡蛋中,您可以通过 PyPI 安装它:

easy_install tornado_subprocess

现在你可以这样做了:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

您也可以将它与 RequestHandler 一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

【讨论】:

感谢您提供的好功能!澄清一下,为什么我们不能简单地使用threading.Thread 来创建新的非阻塞进程?我在 Tornado websocket 实例的on_message 中使用了它,它做得很好。 在龙卷风中大多不鼓励线程。它们适用于小型、短期运行的功能。你可以在这里阅读:***.com/questions/7846323/tornado-web-and-threadsgithub.com/facebook/tornado/wiki/Threading-and-concurrency @VukasinToroman 你真的用这个救了我。非常感谢你的 tornado_subprocess 模块 :) 这在 Windows 上有效吗? (注意select,带有文件描述符,does not) 这个库不使用select 调用。我没有在 Windows 下尝试过,但你可能会遇到麻烦,因为 lib 使用的是 fcntl 模块。简而言之:不,这可能在 Windows 下不起作用。【参考方案11】:

现有的解决方案对我不起作用(详情如下)。最终奏效的是使用 read(1) 实现 readline(基于this answer)。后者不阻塞:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有的解决方案不起作用:

    需要 readline 的解决方案(包括基于队列的解决方案)总是阻塞。很难(不可能?)杀死执行 readline 的线程。它只会在创建它的进程完成时被杀死,但不会在输出生成进程被杀死时被杀死。 正如 anonnn 指出的那样,将低级 fcntl 与高级 readline 调用混合可能无法正常工作。 使用 select.poll() 很简洁,但根据 python 文档,在 Windows 上不起作用。 使用第三方库对于这项任务来说似乎有些矫枉过正,并且会增加额外的依赖项。

【讨论】:

1. q.get_nowait() from my answer 永远不能阻塞,这就是使用它的意义所在。 2. 执行 readline (enqueue_output() function) 的线程在 EOF 上退出,例如,包括输出生成进程被杀死的情况。如果你认为不是这样;请提供a complete minimal code example 以其他方式显示(可能是new question)。 @sebastian 我花了一个小时或更长时间试图想出一个最小的例子。最后,我必须同意您的答案处理所有情况。我想这对我来说没有用,因为当我试图杀死输出产生过程时,它已经被杀死并给出了一个难以调试的错误。这个小时花得很好,因为在想出一个最小的例子的同时,我可以想出一个更简单的解决方案。 您也可以发布更简单的解决方案吗? :)(如果它与塞巴斯蒂安的不同) @danger89:我认为dcmpid = myprocess. 在 read() 调用之后的条件下(就在 while True 之后):out 永远不会是空字符串,因为您至少读取了长度为 1 的字符串/字节。【参考方案12】:

我添加这个问题来读取一些 subprocess.Popen 标准输出。 这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

【讨论】:

fcntl 在 Windows 上不起作用,根据docs。 @anatolytechtonik 改用msvcrt.kbhit()【参考方案13】:

现代 Python 的情况要好得多。

这是一个简单的子程序“hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello i")

还有一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,几乎所有先前的答案(包括此处和相关问题)中的实际模式是将子项的 stdout 文件描述符设置为非阻塞,然后在某种选择循环中对其进行轮询。当然,现在这个循环是由 asyncio 提供的。

【讨论】:

imo 这是最好的答案,它实际上在后台使用 Windows 重叠/异步读/写(相对于一些线程变体来处理阻塞)。根据文档,您应该调用 drain() 以确保 write(..) 实际通过【参考方案14】:

此版本的非阻塞读取不需要需要特殊模块,并且可以在大多数 Linux 发行版上开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

【讨论】:

【参考方案15】:

这是一个基于线程的简单解决方案:

适用于 Linux 和 Windows(不依赖于 select)。 异步读取stdoutstderr。 不依赖于具有任意等待时间的主动轮询(CPU 友好)。 不使用asyncio(可能与其他库冲突)。 一直运行到子进程终止。

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

【讨论】:

【参考方案16】:

我有原始提问者的问题,但不想调用线程。我将 Jesse 的解决方案与来自管道的直接read() 和我自己的用于行读取的缓冲区处理程序混合在一起(但是,我的子进程 - ping - 总是写完整的行

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

并且主程序设置一个ping然后调用gobject邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到 gobject 中的回调。

【讨论】:

【参考方案17】:

在此处添加此答案,因为它提供了在 Windows 和 Unix 上设置非阻塞管道的能力。

所有ctypes 的详细信息都感谢@techtonik's answer。

有一个稍作修改的版本可在 Unix 和 Windows 系统上使用。

Python3 兼容(只需要小改动)。 包括 posix 版本,并定义了用于任一版本的异常。

这样您就可以对 Unix 和 Windows 代码使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://***.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的 readline 生成器(它返回每一行的字节字符串)。

它是一个生成器,因此您可以...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

【讨论】:

(1) this comment 表示readline() 不适用于Python 2 上的非阻塞管道(例如使用fcntl 设置)——你认为它不再正确吗? (我的答案包含提供相同信息但现在似乎已删除的链接(fcntl)。 (2) 看看multiprocessing.connection.Pipe如何使用SetNamedPipeHandleState 我只在 Python3 上测试过这个。但也看到了这些信息,并期望它仍然有效。我还编写了自己的代码来代替 readline,我已经更新了答案以包含它。【参考方案18】:

select 模块可帮助您确定下一个有用的输入在哪里。

但是,您几乎总是对单独的线程更满意。一个执行阻塞读取标准输入,另一个执行您不想阻塞的任何位置。

【讨论】:

我认为这个答案没有帮助,原因有两个:(a) select 模块在 Windows 下的管道上不起作用(正如提供的链接明确指出的那样),它击败了OP 打算拥有一个可移植的解决方案。 (b) 异步线程不允许父进程和子进程之间进行同步对话。如果父进程想要​​根据从子进程读取的下一行来调度下一个动作怎么办?! select 也没有用,因为 Python 的读取即使在 select 之后也会阻塞,因为它没有标准的 C 语义并且不会返回部分数据。 从孩子的输出中读取的单独阈值解决了与此类似的问题。如果您需要同步交互,我想您不能使用此解决方案(除非您知道预期的输出)。我会接受这个答案【参考方案19】:

为什么要麻烦线程和队列? 与 readline() 不同,BufferedReader.read1() 不会阻塞等待 \r\n,如果有任何输出进来,它会尽快返回。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

【讨论】:

如果没有任何东西进来,它会尽快返回吗?如果没有,则表示阻塞。 @MathieuPagé 是对的。 read1 将在第一个底层读取阻塞时阻塞,当管道仍然打开但没有输入可用时发生。【参考方案20】:

在我的例子中,我需要一个日志模块来捕获后台应用程序的输出并对其进行扩充(添加时间戳、颜色等)。

我最终得到了一个执行实际 I/O 的后台线程。以下代码仅适用于 POSIX 平台。我剥离了非必要的部分。

如果有人打算长期使用这种野兽,请考虑管理开放描述符。就我而言,这不是什么大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = 
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

【讨论】:

【参考方案21】:

我的问题有点不同,因为我想从正在运行的进程中收集 stdout 和 stderr,但最终相同,因为我想在生成的小部件中呈现输出。

我不想求助于使用队列或其他线程的许多建议的解决方法,因为它们对于执行诸如运行另一个脚本并收集其输出这样的常见任务是不必要的。

在阅读了建议的解决方案和 python 文档后,我通过以下实现解决了我的问题。是的,它仅适用于 POSIX,因为我正在使用 select 函数调用。

我同意文档令人困惑,并且对于这样一个常见的脚本任务,实现起来很尴尬。我相信旧版本的 python 对Popen 有不同的默认值和不同的解释,因此造成了很多混乱。这似乎适用于 Python 2.7.12 和 3.5.2。

关键是将bufsize=1 设置为行缓冲,然后将universal_newlines=True 设置为文本文件而不是二进制文件,这在设置bufsize=1 时似乎成为默认值。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR、DEBUG 和 VERBOSE 只是将输出打印到终端的宏。

恕我直言,这个解决方案的效率为 99.99%,因为它仍然使用阻塞 readline 函数,所以我们假设子进程很好并且输出完整的行。

我欢迎提供反馈以改进解决方案,因为我还是 Python 新手。

【讨论】:

在这种特殊情况下,您可以在 Popen 构造函数中设置 stderr=subprocess.STDOUT,并从 cmd.stdout.readline() 获取所有输出。 好清晰的例子。 select.select() 有问题,但这为我解决了。【参考方案22】:

不是第一个也可能不是最后一个,我构建了一个包,它使用两种不同的方法进行非阻塞 stdout PIPE 读取,一种基于 JF Sebastian (@jfs) 的回答,另一种是带有线程的简单通信()循环以检查超时。

两种标准输出捕获方法都经过测试,可在 Linux 和 Windows 下工作,截至撰写本文时,Python 版本从 2.7 到 3.9

由于是非阻塞的,它保证了超时强制执行,即使有多个子进程和孙子进程,甚至在 Python 2.7 下也是如此。

该包还处理字节和文本标准输出编码,在尝试捕获 EOF 时是一场噩梦。

你可以在https://github.com/netinvent/command_runner找到这个包

如果您需要一些经过良好测试的非阻塞读取实现,请尝试一下(或破解代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在_poll_process()_monitor_process() 中找到核心非阻塞读取代码,具体取决于所采用的捕获方法。 从那里,你可以破解你想要的方式,或者简单地使用整个包来执行你的命令作为子进程的替换。

【讨论】:

【参考方案23】:

我创建了一个基于J. F. Sebastian's solution 的库。你可以使用它。

https://github.com/cenkalti/what

【讨论】:

【参考方案24】:

根据 J.F. Sebastian 的回答和其他几个来源,我整理了一个简单的子流程管理器。它提供请求非阻塞读取,以及并行运行多个进程。它不使用任何特定于操作系统的调用(据我所知),因此应该可以在任何地方使用。

它可以从 pypi 获得,所以只需 pip install shelljob。有关示例和完整文档,请参阅 project page。

【讨论】:

【参考方案25】:

编辑:这个实现仍然阻塞。请改用 J.​​F.Sebastian 的 answer。

我尝试了top answer,但是线程代码的额外风险和维护令人担忧。

浏览io module(仅限于2.6),我找到了BufferedReader。这是我的无线程、非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

【讨论】:

你试过for line in iter(p.stdout.readline, ""): # do stuff with the line吗?它是无线程的(单线程)并且在您的代码阻塞时阻塞。 @j-f-sebastian 是的,我最终回复了你的答案。我的实现仍然偶尔会被阻止。我将编辑我的答案以警告其他人不要走这条路。【参考方案26】:

这是一个在子进程中运行交互命令的例子,stdout是使用伪终端交互的。可以参考:https://***.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

【讨论】:

【参考方案27】:

此解决方案使用select 模块从 IO 流中“读取任何可用数据”。此函数最初会阻塞,直到数据可用,但随后仅读取可用的数据并且不会进一步阻塞。

鉴于它使用 select 模块,这仅适用于 Unix。

代码完全符合 PEP8。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

【讨论】:

【参考方案28】:

我也遇到了Jesse 描述的问题,并通过像Bradley、Andy 和其他人一样使用“选择”来解决它,但在阻塞模式下避免了繁忙的循环。它使用虚拟管道作为假标准输入。选择块并等待标准输入或管道准备好。当一个键被按下时,标准输入解除阻塞选择,并且可以使用 read(1) 检索键值。当不同的线程写入管道时,管道会解除对选择的阻塞,这可以被视为对标准输入的需求已经结束的指示。这是一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

【讨论】:

注意:为了在 Windows 中进行这项工作,管道应替换为套接字。我还没有尝试过,但它应该根据文档工作。【参考方案29】:

试试wexpect,这是pexpect的Windows替代品。

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

【讨论】:

【参考方案30】:

这是一个python中支持非阻塞读取和后台写入的模块:

https://pypi.python.org/pypi/python-nonblock

提供一个函数,

nonblock_read 将从流中读取数据(如果可用),否则返回空字符串(如果流在另一侧关闭并且所有可能的数据都已读取,则返回 None)

您也可以考虑使用 python-subprocess2 模块,

https://pypi.python.org/pypi/python-subprocess2

添加到子流程模块。因此,在从“subprocess.Popen”返回的对象上添加了一个附加方法,runInBackground。这将启动一个线程并返回一个对象,该对象将在内容写入 stdout/stderr 时自动填充,而不会阻塞您的主线程。

享受吧!

【讨论】:

我想试试这个nonblock 模块,但我对一些Linux 程序比较陌生。究竟如何安装这些例程?我正在运行 Raspbian Jessie,这是一种用于 Raspberry Pi 的 Debian Linux。我尝试了 'sudo apt-get install nonblock' 和 python-nonblock 并且都抛出了错误 - 未找到。我已经从这个站点pypi.python.org/pypi/python-nonblock 下载了 zip 文件,但不知道如何处理它。谢谢....RDK

以上是关于在 Python 中对 subprocess.PIPE 进行非阻塞读取的主要内容,如果未能解决你的问题,请参考以下文章

在Python中对关联数组进行排序[重复]

在 Python 中对运行时间进行基准测试

在Python列表中对嵌套字典进行排序? [复制]

在python中对数组列表进行分类

如何在 python 中对 URL 进行分层排序?

在 Python 中对嵌套列表进行排序和分组