如何使用 subprocess.Popen 通过管道连接多个进程?

Posted

技术标签:

【中文标题】如何使用 subprocess.Popen 通过管道连接多个进程?【英文标题】:How do I use subprocess.Popen to connect multiple processes by pipes? 【发布时间】:2010-09-22 16:00:49 【问题描述】:

如何使用 Python subprocess 模块执行以下 shell 命令?

echo "input data" | awk -f script.awk | sort > outfile.txt

输入数据将来自一个字符串,所以我实际上不需要echo。我已经走到这一步了,谁能解释我如何让它通过sort 也可以通过管道传输?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

更新:请注意,虽然下面接受的答案实际上并没有回答所提出的问题,但我相信 S.Lott 是正确的,最好避免首先解决该问题!

【问题讨论】:

【参考方案1】:

你会更高兴以下。

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

将部分工作委托给 shell。让它通过管道连接两个进程。

您会更高兴将“script.awk”重写为 Python,消除 awk 和管道。

编辑。建议 awk 没有帮助的一些原因。

[通过 cmets 响应的理由太多。]

    Awk 正在添加一个没有重要价值的步骤。 awk 的处理没有什么独特之处是 Python 无法处理的。

    对于大型数据集,从 awk 到排序的流水线可能会缩短处理时间。对于短数据集,它没有显着的好处。对awk >file ; sort fileawk | sort 的快速测量将揭示并发帮助。使用排序,它很少有帮助,因为排序不是一次性过滤器。

    “Python 到排序”处理(而不是“Python 到 awk 到排序”)的简单性避免了在此处提出确切类型的问题。

    Python - 虽然比 awk 更冗长 - 也是显式的,其中 awk 具有某些对新手不透明的隐含规则,并且让非专业人士感到困惑。

    Awk(就像 shell 脚本本身一样)添加了另一种编程语言。如果所有这些都可以用一种语言(Python)完成,那么消除 shell 和 awk 编程就消除了两种编程语言,让人们可以专注于任务中产生价值的部分。

底线:awk 不能增加重要的价值。在这种情况下,awk 是净成本;它增加了足够的复杂性,因此有必要提出这个问题。删除 awk 将是一个净收益。

侧边栏为什么构建管道 (a | b) 如此困难。

当 shell 遇到a | b 时,它必须执行以下操作。

    fork 原始 shell 的子进程。这最终会变成b。

    构建一个操作系统管道。 (不是 Python subprocess.PIPE),而是调用 os.pipe(),它返回两个通过公共缓冲区连接的新文件描述符。此时,该进程具有来自其父进程的标准输入、标准输出、标准错误,以及一个将是“a 的标准输出”和“b 的标准输入”的文件。

    叉一个孩子。孩子用新的 a 的标准输出替换它的标准输出。执行a 进程。

    b 子关闭用新 b 的标准输入替换其标准输入。执行b 进程。

    b 子进程等待 a 完成。

    父级正在等待 b 完成。

我认为上面可以递归使用来生成a | b | c,但是你必须隐式地为长管道加上括号,将它们视为a | (b | c)

由于 Python 有 os.pipe()os.exec()os.fork(),并且您可以替换 sys.stdinsys.stdout,因此有一种方法可以在纯 Python 中完成上述操作。确实,您可以使用os.pipe()subprocess.Popen 计算出一些快捷方式。

但是,将该操作委托给 shell 会更容易。

【讨论】:

而且我认为 Awk 实际上非常适合我正在做的事情,代码比等效的 Python 代码更短更简单(毕竟它是一种特定于领域的语言。) -c 告诉外壳程序(您启动的实际应用程序)以下参数是要运行的命令。在这种情况下,命令是一个 shell 管道。 “代码更短”并不——实际上——意味着更简单。它只意味着更短。 awk 有很多假设和隐藏的特性,使得代码很难使用。 Python 虽然更长,但很明确。 当然,我理解您的观点和担忧,并同意在许多情况下,我上面的示例最好用纯 Python 编写。在我的情况下,我还没有准备好这样做,但是因为 awk 脚本可以工作并且已经过调试。迟早,但不是现在。 而且,这并没有改变原来的问题,即如何使用 subprocess.Popen。 awk 和 sort 仅用于说明,因为潜在的回答者可能会让它们进行测试。【参考方案2】:
import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

【讨论】:

太棒了!我对其进行了修改以制作一个没有 awk 脚本的独立示例,它使用 sed:sam.nipl.net/code/python/pipeline.py @SamWatkins:您的代码中不需要p1.wait()p1.communicate() 收割子进程。 这个答案不是更pythonic更好吗?它不使用 shell=True 在子流程文档中不鼓励使用。我看不出人们投票支持@S.Lott 答案的原因。 @KenT:shell 解决方案更具可读性且不易出错(如果您不接受不受信任的输入)。 pythonic 解决方案将use plumbum (the shell syntax embedded in Python) 或另一个接受类似语法(在字符串中)并为您构造管道的模块(无论本地/bin/sh 做什么,都具有相同的行为)。【参考方案3】:

要模拟 shell 管道:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

不调用 shell(参见17.1.4.2. Replacing shell pipeline):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum 提供了一些语法糖:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

类比:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

是:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

【讨论】:

Plumbum 看起来很漂亮,但我对“魔法”持谨慎态度。这不是 Perl! Plumbum 看起来不错!我不会担心@KyleStrand 的魔力——快速浏览一下文档,你不需要使用“魔法”位,该模块还有其他方法可以做同样的事情——快速浏览一下代码表明魔法是无害的,实际上非常光滑,一点也不讨厌。 @Tom 我不知道,这是很多运算符重载,具有潜在的令人惊讶的含义。我的一部分喜欢它,但我不愿意在任何地方使用它,除非是在个人项目中。 @KyleStrand:总的来说,我同意你的看法,但实际上,人们更有可能错误地构建命令行(例如,通过忘记pipes.quote())或在实现管道时引入错误在 Python 中,even a | b could be implemented with errors. @jfs,如果文件通过 POST 请求使用 cat 或 【参考方案4】:

接受的答案是回避这个问题。 这是一个链接多个进程的输出的 sn-p: 请注意,它还会打印(有点)等效的 shell 命令,以便您可以运行它并确保输出正确。

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : ' '.join(cmd1) | ' '.join(cmd2) | ' '.join(cmd3)")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

【讨论】:

如果p*.returncode 返回0,我可以假设没有产生错误吗? @Omry 亚丹 您可以确定它返回 0。“产生的错误”定义不明确。它仍然可以将内容打印到标准错误。 据我所知,如果它是空字符串,我还必须检查 stderr 我可以确定是否生成了错误。 这取决于你所说的错误是什么意思。即使没有错误,某些程序也会定期打印到 stderr。 可以这样死锁吗? cmd1 正在写信给 stderr 并且没有任何东西消耗 p1.stderr。如果文件缓冲区已满,操作系统将停止执行p1 进程。 p2 也一样。【参考方案5】:

http://www.python.org/doc/2.5.2/lib/node535.html 很好地涵盖了这一点。有没有你不明白的地方?

您的程序将非常相似,但第二个 Popen 将 stdout= 写入文件,并且您不需要其 .communicate() 的输出。

【讨论】:

我不明白(鉴于文档的示例),如果我说 p2.communicate("input data"),它实际上会发送到 p1.stdin 吗? 你不会的。 p1 的 stdin arg 将设置为 PIPE,您将编写 p1.communicate('foo') 然后通过执行 p2.stdout.read() 获取结果 @Leonid - Python 人不太擅长向后兼容。您可以从docs.python.org/2/library/subprocess.html#popen-objects 获得许多相同的信息,但无论如何我已经用回路机器链接替换了该链接。 没有必要询问是否存在“[OP] 不理解的 [the docs] 的某些部分”。如这个问题所示,您发布的文档部分实际上并未解决将输入传递给第一个进程的问题:***.com/q/6341451/1858225【参考方案6】:

受@Cristian 回答的启发。我遇到了同样的问题,但使用了不同的命令。所以我把我测试过的例子,我相信这可能会有所帮助:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

这是经过测试的。

做了什么

声明惰性grep 使用管道中的标准输入执行。当管道将被ps 的标准输出填充时,该命令将在ps 命令执行时执行。 调用主命令ps,并将标准输出定向到grep 命令使用的管道。 Grep 进行通信以从管道中获取标准输出。

我喜欢这种方式,因为它是用subprocess 接口轻轻包裹的自然管道概念。

【讨论】:

为避免僵尸,请在grep_proc.communicate() 之后调用ps_proc.wait()err 始终为 None,除非您设置了 stderr=subprocess.PIPE【参考方案7】:

之前的答案错过了一个重要的点。正如 geocar 所指出的,Replacing shell pipeline 基本上是正确的。 几乎在管道的最后一个元素上运行 communicate 就足够了。

剩下的问题是将输入数据传递给管道。对于多个子进程,最后一个元素上的简单 communicate(input_data) 不起作用 - 它永远挂起。您需要像这样手动创建一个管道和一个子级:

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", " print $2; "],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

现在子进程通过管道提供输入,父进程调用communicate(),按预期工作。使用这种方法,您可以创建任意长的管道,而无需求助于“将部分工作委托给 shell”。不幸的是,subprocess documentation 没有提到这一点。

有一些方法可以在没有管道的情况下达到同样的效果:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

现在将stdin=tf 用于p_awk。看你喜欢什么口味了。

上面仍然不是 100% 等效于 bash 管道,因为信号处理不同。如果添加另一个截断sort 输出的管道元素,您可以看到这一点,例如head -n 10。使用上面的代码,sort 将向stderr 打印“Broken pipe”错误消息。当您在 shell 中运行相同的管道时,您不会看到此消息。 (这是唯一的区别,stdout 中的结果是相同的)。原因似乎是python的PopenSIG_IGN设置为SIGPIPE,而shell将其留在SIG_DFL,而sort的信号处理在这两种情况下是不同的。

【讨论】:

在最后一个进程上运行通信就足够了,看我的回答。【参考方案8】:

编辑:pipes 在 Windows 上可用,但至关重要的是,在 Windows 上似乎实际上工作。请参阅下面的 cmets。

Python 标准库现在包含用于处理此问题的 pipes 模块:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

我不确定这个模块已经存在了多久,但这种方法似乎比使用 subprocess 简单得多。

【讨论】:

pipes 甚至在 subprocess 模块之前就已经存在。它构建了一个 (*nix) shell 管道(带有"|" 的字符串,在/bin/sh 中执行)。它不是便携式的。它不是 subprocess 模块的替代品,它是可移植的,不需要启动 shell 来运行命令。 pipes 接口是从 Enterprise JavaBeans 是闪亮的新事物的时候开始的(它不是恭维)。您能否提供pipes 代码示例,它“比子流程简单得多”check_call('echo "input data" | a | b &gt; outfile.txt', shell=True)from my answer? @J.F.Sebastian Huh。你的check_call 命令不应该同样不可移植吗? DOS 提供标准(即*NIX 可比,至少 AFAIK)| 行为,那么您希望 pipes 不能在哪些系统上工作?我承认将check_call 与代表你的shell 命令的字符串一起使用可以说与使用pipes 一样简单,但我希望有一些东西可以促进管道的编程构造,而不是仅仅将单个字符串传递给外壳(你的其他例子)。 正如我在评论中所说,plumbum 看起来确实不错——它似乎提供了我正在寻找的简单性、灵活性和功能。但是,语法完全不透明且非 Pythonic。所以我想要的是与标准 *NIX-shell 管道一样简单和容易的东西(如果可能稍微不那么简洁),同时在语法和风格上仍然“看起来”像 Python。 pipes,乍一看,肯定满足这些要求;但是,如果您认为它是不可移植的(您可能是)是对的,那么它当然是最不吸引人的选择。 ...而且,是的,它看起来像 echo hello worldC:\cygwin\bin\tr a-z A-Z 的简单管道在 Windows 上失败,即使 echo hello world | C:\cygwin\bin\tr.exe a-z A-Z 有效。这……奇怪而令人失望。【参考方案9】:

对我来说,下面的方法是最干净和最容易阅读的

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

可以这样调用:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')

【讨论】:

以上是关于如何使用 subprocess.Popen 通过管道连接多个进程?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 subprocess.Popen 使用 STDIN [重复]

使用 subprocess.Popen 通过 SSH 或 SCP 发送密码

如何将 Subprocess.popen() 与 pyinstaller 一起使用?

如何在 Windows 上使用带有内置命令的 subprocess.Popen

如何使用Python中的subprocess.Popen返回编码值?

实时 subprocess.Popen 通过 stdout 和 PIPE