python:在线程中读取子进程输出

Posted

技术标签:

【中文标题】python:在线程中读取子进程输出【英文标题】:python: reading subprocess output in threads 【发布时间】:2012-04-06 08:56:54 【问题描述】:

我有一个使用 subprocess.Popen 调用的可执行文件。然后,我打算使用一个线程通过标准输入向它提供一些数据,该线程从队列中读取其值,稍后将填充到另一个线程中。输出应在另一个线程中使用 stdout 管道读取,并再次在队列中排序。

据我之前的研究了解,将线程与队列一起使用是一种很好的做法。

不幸的是,外部可执行文件不会很快就管道输入的每一行给出答案,因此简单的写入、读取行周期不是一个选项。可执行文件实现了一些内部多线程,我希望在输出可用时立即输出,因此需要额外的读取器线程。

作为一个测试可执行文件的示例,将只打乱每一行(shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

请注意,这已尽可能无缓冲。或者不是吗?这是我的精简测试程序:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

这给出以下输出(python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

然后 2 秒内什么都没有……

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

开头的交错是预期的。然而,子进程的输出直到之后子进程结束时才会出现。随着更多的行输入,我得到了一些输出,因此我假设标准输出管道中存在缓存问题。根据此处发布的其他问题,刷新标准输出(在子进程中)应该可以工作,至少在 Linux 上是这样。

【问题讨论】:

【参考方案1】:

您的问题与subprocess 模块或线程(尽管存在问题)无关,甚至与子进程和线程混合(一个非常的坏主意,甚至比使用线程启动更糟糕) ,除非你使用 Python 3.2 的子进程模块的反向端口,你可以从 code.google.com/p/python-subprocess32 获得)或从多个线程访问相同的东西(就像你的 print 语句所做的那样。)

发生的情况是您的shuffleline.py 程序缓冲了。不是输出,而是输入。虽然不是很明显,但是当你迭代一个文件对象时,Python 会读取块,通常是 8k 字节。由于sys.stdin 是一个文件对象,您的for 循环将缓冲直到EOF 或一个完整块:

for line in sys.stdin:
    line = line.strip()
    ....

如果您不想这样做,请使用 while 循环调用 sys.stdin.readline()(对于 EOF 返回 ''):

while True:
    line = sys.stdin.readline()
    if not line:
        break
    line = line.strip()
    ...

或使用iter() 的双参数形式,它创建一个迭代器,该迭代器调用第一个参数,直到返回第二个参数(“哨兵”):

for line in iter(sys.stdin.readline, ''):
    line = line.strip()
    ...

如果我不建议为此不使用线程,而是在子进程的管道上使用非阻塞 I/O,或者甚至像 twisted.reactor.spawnProcess 这样有很多挂钩进程和其他东西的方法,我也会失职一起作为消费者和生产者。

【讨论】:

请问为什么子进程和线程的混合是一种如此糟糕的方法?这似乎比在什么都没有发生的情况下一遍又一遍地调用非阻塞 I/O 更优雅。显然,线程不应该访问任何非线程安全的数据结构,而只是从队列读取或写入队列似乎是安全的。 Python3.2 backport 的变化对于我这样一个简单的案例重要吗? 线程和子进程的问题具体是线程和fork混合的问题。请参阅linuxprogrammingblog.com/… 和其他此类文章。 Python 3.2 子流程反向移植可以解决这些问题。一般来说,线程的主要问题是它们难以控制和调试。例如,您无法从线程“外部”杀死它们,因此如果线程卡在读取或写入中,您将无能为力。

以上是关于python:在线程中读取子进程输出的主要内容,如果未能解决你的问题,请参考以下文章

读取Python中imagemagick子进程的输出[复制]

如何读取子进程标准输出的第一个字节,然后在 Python 中丢弃其余字节?

如何读取子进程标准输出的第一个字节,然后在 Python 中丢弃其余字节?

从Python中的imagemagick子进程读取输出[重复]

从Java开始忽略/捕获子进程输出的最简单方法

Python 运行守护程序子进程并读取标准输出