如何将数据写入 Python shell 管道中第一个进程的标准输入?

Posted

技术标签:

【中文标题】如何将数据写入 Python shell 管道中第一个进程的标准输入?【英文标题】:How to write data to stdin of the first process in a Python shell pipeline? 【发布时间】:2015-08-22 02:58:45 【问题描述】:

在围绕 Python 子流程管道的讨论中,我看到该代码 sn-p 被大量引用。必填链接:https://docs.python.org/3.4/library/subprocess.html#replacing-shell-pipeline

稍作修改:

p1 = subprocess.Popen(['cat'],
                      stdin=subprocess.PIPE,
                      stdout=subprocess.PIPE)
p2 = subprocess.Popen(['head', '-n', '1'],
                      stdin=p1.stdout,
                      stdout=subprocess.PIPE)
# Allow p1 to receive a SIGPIPE if p2 exits.
p1.stdout.close()
output = p2.communicate()[0]

除了简洁地展示挑战之外,这个 shell 管道毫无意义。输入"abc\ndef\nghi\n"output 中只应捕获"abc\n"

将数据写入p1.stdin 的最佳方式是什么?我知道subprocess.Popen.communicate()input 参数,但它在管道中不起作用。此外,解决方案需要正确处理阻塞。

我的猜测:对communicate() 背后的代码进行逆向工程,并为这个特定问题创建另一个版本。在我这样做之前,我想问一下是否有一个我不知道的更简单的解决方案。

【问题讨论】:

【参考方案1】:

写信给p1.stdin,然后在调用p2.communicate()之前关闭它:

In [1]: import subprocess

In [2]: %cpaste
Pasting code; enter '--' alone on the line to stop or use Ctrl-D.
:p1 = subprocess.Popen(['cat'],
:                      stdin=subprocess.PIPE,
:                      stdout=subprocess.PIPE)
:p2 = subprocess.Popen(['head', '-n', '1'],
:                      stdin=p1.stdout,
:                      stdout=subprocess.PIPE)
:p1.stdout.close()
:--

In [3]: p1.stdin.write(b'This is the first line.\n')
Out[3]: 24

In [4]: p1.stdin.write(b'And here is the second line.\n')
Out[4]: 29

In [5]: p1.stdin.close()

In [6]: p2.communicate()
Out[6]: (b'This is the first line.\n', None)

(不要忘记您发送到cat 的数据中的换行符,否则它将不起作用。)

【讨论】:

我认为如果标准输入的数据非常大,或者进程阻塞标准输入,您的解决方案将阻塞。根据我的经验,这很常见。 如果子进程阻塞(不处理)标准输入,你几乎无能为力。然后write 调用应该引发BlockingError,这将向您发出管道被阻塞的信号,您应该稍后再试。 @kevinarpe:只要在第一个子进程完成读取之前第二个子进程没有开始写入;不应该有任何问题。否则(正如您所提到的)可能会出现死锁。 @ Roland Smith:如果输入和输出是并发的,非阻塞 io 会有所帮助。 @J.F.Sebastian 但是如何强制非阻塞 io?我在io 模块中看不到任何控制它的东西。您可以在使用open() 时控制缓冲,但由于子进程的管道已为您打开,我看不出有什么强制方法。 @RolandSmith:“如何”取决于“你想要什么”,例如,您可以使用fcntl 为管道设置O_NONBLOCK 标志,以便.write() 会引发相应的异常到EAGAINEWOULDBLOCK errno。就我个人而言,如果我需要异步 I/O,我更喜欢基于 threading、asyncio、twisted、gevent、select modules 的解决方案。【参考方案2】:

您需要同时调用p1.communicate(b"abc\ndef\nghi\n")output = p2.communicate()[0] 的等效项。一种可移植的方法是使用线程或asyncio

【讨论】:

【参考方案3】:

作为一个工作示例:

import subprocess, threading

# Unmodified from original code
p1 = subprocess.Popen(['cat'],
                      stdin=subprocess.PIPE,
                      stdout=subprocess.PIPE)
p2 = subprocess.Popen(['head', '-n', '1'],
                      stdin=p1.stdout,
                      stdout=subprocess.PIPE)

# New code: Start a thread that writes stdin, and closes it when complete
def write_stdin():
    p1.stdin.write("abc\ndef\nghi\n")
    p1.stdin.close()

write_t = threading.Thread(target = write_stdin)
write_t.start()

# Unmodified from original code
p1.stdout.close()
output = p2.communicate()[0]

【讨论】:

以上是关于如何将数据写入 Python shell 管道中第一个进程的标准输入?的主要内容,如果未能解决你的问题,请参考以下文章

如何高效地向Redis写入大量的数据

如何在 shell 管道中使用不同的文件描述符?

从 java 程序调用 unix shell?从 java 程序中向 unix 管道读取和写入稳定的数据流

Python - 如何使用管道执行shell命令,但没有'shell = True'?

如何使用 Python 将标准输入/标准输出通过管道传输到 Perl 脚本

检测命名管道的关闭