Python:使用额外的输入和输出文件向外部程序提供和解析数据流

Posted

技术标签:

【中文标题】Python:使用额外的输入和输出文件向外部程序提供和解析数据流【英文标题】:Python: Feed and parse stream of data to and from external program with additional input and output files 【发布时间】:2015-10-17 17:24:03 【问题描述】:

问题: 我有一个设计不佳的 Fortran 程序(我无法更改它,我坚持使用它),它从标准输入和其他输入文件获取文本输入,并将文本输出结果写入标准输出和其他输出文件。输入输出的大小相当大,我想避免写入硬盘驱动器(操作慢)。我已经编写了一个函数来迭代几个输入文件的行,并且我还有用于多个输出的解析器。我真的不知道程序是先读取所有输入然后开始输出,还是一边读取输入一边开始输出。

目标: 有一个函数可以为外部程序提供所需的内容,并解析来自程序的输出,而无需将数据写入硬盘驱动器上的文本文件。

研究: 使用文件的幼稚方式是:

from subprocess import PIPE, Popen

def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):

    for filename, file_iter in input_files.iteritems():
        with open(filename ,'w') as f:
            for line in file_iter:
                f.write(line + '\n')


    p_sub = Popen(
        shlex.split(cmd),
        stdin = PIPE,
        stdout = open('stdout.txt', 'w'),
        stderr = open('stderr.txt', 'w'),
        bufsize=1
    )
    for line in stdin_iter:
        p_sub.stdin.write(line + '\n')

    p_sub.stdin.close()
    p_sub.wait()

    data = 
    for filename, parse_func in output_files.iteritems():
        # The stdout.txt and stderr.txt is included here
        with open(filename,'r') as f:
            data[filename] = parse_func(
                    iter(f.readline, b'')
            )
    return data

我曾尝试和subprocess 模块一起执行外部程序。额外的输入/输出文件由命名管道和multiprocessing 处理。我想用一个迭代器(它返回输入的行)提供标准输入,将标准错误保存在一个列表中,并解析标准输出,因为它来自外部程序。输入和输出可能相当大,所以使用communicate 是不可行的。

我有一个格式解析器:

def parser(iterator):
    for line in iterator:
        # Do something
        if condition:
            break
    some_other_function(iterator)
    return data

我查看了这个solution 使用select 选择合适的流,但是我不知道如何使它与我的标准输出解析器一起工作以及如何提供标准输入。

我也查看了asyncio 模块,但正如我所见,我在解析 stout 时也会遇到同样的问题。

【问题讨论】:

如果在启动任何线程之前启动 Fortran 程序,您可以从单独的线程开始为 Fortran 程序提供数据,并从主线程收集其输出。或者,您可以启动一个额外的 Python 程序,并将 Fortran 的标准输入传递给它的标准输出,然后您的主程序再次简单地处理 Fortran 程序的标准输出。 我认为你有点混淆了。及时处理(到达时)与异步 IO 不同。对于 Unix 上的异​​步 IO,您可以使用 asyncore 模块。它使用 select,尽管它不完全适用于文件。为什么用文件替换 stdout 和 stderr 而不是 stdout.read() 或 stdout.readlines()?您可以指定字节数,因此您不必等到它完成。如果您担心硬盘驱动器,请在 RAM 内存中挂载一个目录并在那里写入。 我的意思是,select 不是用于异步文件 IO,只是套接字。但是您当然可以检查某些文件是否已准备好执行所需操作。 @Dalen:是的,我认为像twisted 这样的选项可能是一个选项。 据我所知,twisted 不做文件或终端。所以它无能为力。 【参考方案1】:

您应该对 Fortran 程序的所有输入和输出使用命名管道,以避免写入磁盘。然后,在您的消费者中,您可以使用线程从程序的每个输出源中读取数据,并将信息添加到队列中以进行顺序处理。

为了对此进行建模,我创建了一个 Python 应用程序 daemon.py,它从标准输入读取数据并返回平方根,直到 EOF。它将所有输入记录到指定为命令行参数的日志文件中,并将平方根打印到 stdout,将所有错误打印到 stderr。我认为它模拟了您的程序(当然输出文件的数量只有一个,但可以缩放)。您可以查看此测试应用程序的源代码here。注意对stdout.flush() 的显式调用。默认情况下,标准输出是打印缓冲的,这意味着这是在最后输出,消息不会按顺序到达。我希望您的 Fortran 应用程序不会缓冲其输出。我相信我的示例应用程序可能不会在 Windows 上运行,因为 select 仅在 Unix 上使用,这对你来说应该无关紧要。

我的消费者应用程序将守护程序应用程序作为子进程启动,标准输入、标准输出和标准错误重定向到subprocess.PIPEs。这些管道中的每一个都被分配给一个不同的线程,一个提供输入,三个分别处理日志文件、错误和标准输出。他们都将消息添加到共享的Queue,您的主线程会从该共享Queue 中读取并发送到您的解析器。

这是我的消费者代码:

import os, random, time
import subprocess
import threading
import Queue
import atexit

def setup():
    # make a named pipe for every file the program should write
    logfilepipe='logpipe'
    os.mkfifo(logfilepipe)

def cleanup():
    # put your named pipes here to get cleaned up
    logfilepipe='logpipe'
    os.remove(logfilepipe)

# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)

# My example iterator that supplies input for the program. You already have an iterator 
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
    sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
    def __init__(self, numiterations=1000):
        self.numiterations=numiterations
        self.current = 0

    def __iter__(self):
        return self

    def next(self):
        self.current += 1
        if self.current > self.numiterations:
            raise StopIteration
        else:
            return random.choice(self.__class__.sample_data)

# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
    print "[%s] %s" % (source,line)

# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
    # run the command with output redirected
    for line in MyIter(30): # Limit for testing purposes
        time.sleep(0.1) # sleep a tiny bit
        p.stdin.write(str(line)+'\n')
        queue.put(('INPUT', line))
    p.stdin.close()
    p.wait()

    # Once our process has ended, tell the main thread to quit
    queue.put(('QUIT', True))

# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
    print "Starting to read output for %r" % source
    if isinstance(source,str):
        # Is a file or named pipe, so open it
        source=open(source, 'r') # open file with string name
    line = source.readline()
    # enqueue and read lines until EOF
    while line != '':
        queue.put((tag, line.rstrip()))
        line = source.readline()

if __name__=='__main__':
    cmd='daemon.py'

    # set up our FIFOs instead of using files - put file names into setup() and cleanup()
    setup()

    logfilepipe='logpipe'

    # Message queue for handling all output, whether it's stdout, stderr, or a file output by our command
    lq = Queue.Queue()

    # open the subprocess for command
    print "Running command."
    p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
                                    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Start threads to handle the input and output
    threading.Thread(target=input_func, args=(p, lq)).start()
    threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
    threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()

    # open a thread to read any other output files (e.g. log file) as named pipes
    threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()

    # Now combine the results from our threads to do what you want
    run=True
    while(run):
        (tag, line) = lq.get()
        if tag == 'QUIT':
            run=False
        else:
            parse_func(tag, line)

我的迭代器返回一个随机输入值(其中一些是垃圾,会导致错误)。你的应该是一个替代品。该程序将运行到其输入结束,然后等待子进程完成,然后将QUIT 消息排入主线程。我的parse_func 显然超级简单,只需打印出消息及其来源,但您应该能够使用某些东西。从输出源读取的函数旨在同时使用 PIPE 和字符串 - 不要在主线程上打开管道,因为它们会阻塞直到输入可用。所以对于文件阅读器(例如阅读日志文件),最好让子线程打开文件并阻塞。但是,我们在主线程上生成子进程,因此我们可以将 stdin、stdout 和 stderr 的句柄传递给它们各自的子线程。

部分基于this Python implementation of multitail。

【讨论】:

感谢您的回复。我遇到了一些问题: 1,这个脚本没有以信号结束,它只是挂起。 2、为什么MyIter中一定要有time.sleep(0.1)? . 3,队列在执行之间获得交替的行数。 感谢您的回复。我想知道你是否觉得它有用。至于第 1 点,您可以使用信号模块或通过捕获 KeyboardInterrupt 来检查 SIGINT(请参阅this answer)。你可以在while(run): 循环中检查它,也许还可以实现一些线程清理。对于第 2 点 - 我将 sleep 放在那里以减慢输出速度,因此它是可读的,您可以看到发生了什么,但这不是必需的。我会为此添加评论。 我认为这种方法是正确的方法,尽管我还没有成功地使它在我的应用程序中工作。我会尝试捕捉信号的。【参考方案2】:

如果您在发送新作业之前等待结果结束,那么 Fortran 程序在每个作业结束时调用 flush 非常重要(也可以经常是 moe)。 该命令取决于编译器,例如GNU fortran CALL FLUSH(unitnumber) 或者可以通过关闭输出并再次打开进行附加来模拟。

您还可以轻松地在末尾写几行带有许多空白字符的空行,以填充缓冲区大小并获得新的数据块。 5000 个空白字符可能就足够了,但不会太多,以免阻塞管道的 Fortran 端。如果您在发送新作业后立即读取这些空行,您甚至不需要非阻塞读取。作业的最后一行可以在数字应用程序中轻松识别。如果您要编写“聊天”应用程序,则需要其他人编写的内容。

【讨论】:

这值得考虑,但这并不能解决整个问题。从fortran程序中任意输入和解析输出的解决方案更像是一个“聊天”应用。 您实际上可以使用可以执行任意数量操作的程序。 OP 说他坚持使用特定的 Fortran 程序并且无法更改它;假设应该是(a)他知道他在说什么;并且(b)它可以工作,因为他坚持使用它的原因是它实际上是工作...... @PatrickMaupin:了解问题所在。我记得有一个程序在终端中正常工作,但如果它识别出它的标准输出是一个管道,它就不会被刷新接受。一种解决方案是以编程方式(复杂)创建和控制虚拟终端,另一种解决方案是通过关闭程序的输入来终止程序并频繁地再次启动它来强制刷新。 (非常适合简单的程序,不适用于非平凡的程序)然后考虑远程端真的很有趣,即使我最初禁止它。 Python pty 模块在 linux 下运行良好,并且程序不太可能关心除 stdin/stdout 之外的其余文件,因此命名管道可能适用于这些文件,除非 Fortran 程序是正在寻找。

以上是关于Python:使用额外的输入和输出文件向外部程序提供和解析数据流的主要内容,如果未能解决你的问题,请参考以下文章

Java中流-----个人总结心得

微信小程序怎么向外部php文件传递参数

part12:Python 文件I/O(pathlib模块:PurePathPath,os.path,fnmatch,open,with,linecache,os模块操作文件和目录,tempfile(

Python文件操作 I/O

使用 Apache Commons Exec 向命令提供多个输入并提取输出时出现问题

向 Cordova AppDelegate iOS 添加额外的委托