如何使用 subprocess.Popen 通过管道连接多个进程?
Posted
技术标签:
【中文标题】如何使用 subprocess.Popen 通过管道连接多个进程?【英文标题】:How do I use subprocess.Popen to connect multiple processes by pipes? 【发布时间】:2010-09-22 16:00:49 【问题描述】:如何使用 Python subprocess
模块执行以下 shell 命令?
echo "input data" | awk -f script.awk | sort > outfile.txt
输入数据将来自一个字符串,所以我实际上不需要echo
。我已经走到这一步了,谁能解释我如何让它通过sort
也可以通过管道传输?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
更新:请注意,虽然下面接受的答案实际上并没有回答所提出的问题,但我相信 S.Lott 是正确的,最好避免首先解决该问题!
【问题讨论】:
【参考方案1】:你会更高兴以下。
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )
将部分工作委托给 shell。让它通过管道连接两个进程。
您会更高兴将“script.awk”重写为 Python,消除 awk 和管道。
编辑。建议 awk 没有帮助的一些原因。
[通过 cmets 响应的理由太多。]
Awk 正在添加一个没有重要价值的步骤。 awk 的处理没有什么独特之处是 Python 无法处理的。
对于大型数据集,从 awk 到排序的流水线可能会缩短处理时间。对于短数据集,它没有显着的好处。对awk >file ; sort file
和awk | sort
的快速测量将揭示并发帮助。使用排序,它很少有帮助,因为排序不是一次性过滤器。
“Python 到排序”处理(而不是“Python 到 awk 到排序”)的简单性避免了在此处提出确切类型的问题。
Python - 虽然比 awk 更冗长 - 也是显式的,其中 awk 具有某些对新手不透明的隐含规则,并且让非专业人士感到困惑。
Awk(就像 shell 脚本本身一样)添加了另一种编程语言。如果所有这些都可以用一种语言(Python)完成,那么消除 shell 和 awk 编程就消除了两种编程语言,让人们可以专注于任务中产生价值的部分。
底线:awk 不能增加重要的价值。在这种情况下,awk 是净成本;它增加了足够的复杂性,因此有必要提出这个问题。删除 awk 将是一个净收益。
侧边栏为什么构建管道 (a | b
) 如此困难。
当 shell 遇到a | b
时,它必须执行以下操作。
fork 原始 shell 的子进程。这最终会变成b。
构建一个操作系统管道。 (不是 Python subprocess.PIPE),而是调用 os.pipe()
,它返回两个通过公共缓冲区连接的新文件描述符。此时,该进程具有来自其父进程的标准输入、标准输出、标准错误,以及一个将是“a 的标准输出”和“b 的标准输入”的文件。
叉一个孩子。孩子用新的 a 的标准输出替换它的标准输出。执行a
进程。
b 子关闭用新 b 的标准输入替换其标准输入。执行b
进程。
b 子进程等待 a 完成。
父级正在等待 b 完成。
我认为上面可以递归使用来生成a | b | c
,但是你必须隐式地为长管道加上括号,将它们视为a | (b | c)
。
由于 Python 有 os.pipe()
、os.exec()
和 os.fork()
,并且您可以替换 sys.stdin
和 sys.stdout
,因此有一种方法可以在纯 Python 中完成上述操作。确实,您可以使用os.pipe()
和subprocess.Popen
计算出一些快捷方式。
但是,将该操作委托给 shell 会更容易。
【讨论】:
而且我认为 Awk 实际上非常适合我正在做的事情,代码比等效的 Python 代码更短更简单(毕竟它是一种特定于领域的语言。) -c 告诉外壳程序(您启动的实际应用程序)以下参数是要运行的命令。在这种情况下,命令是一个 shell 管道。 “代码更短”并不——实际上——意味着更简单。它只意味着更短。 awk 有很多假设和隐藏的特性,使得代码很难使用。 Python 虽然更长,但很明确。 当然,我理解您的观点和担忧,并同意在许多情况下,我上面的示例最好用纯 Python 编写。在我的情况下,我还没有准备好这样做,但是因为 awk 脚本可以工作并且已经过调试。迟早,但不是现在。 而且,这并没有改变原来的问题,即如何使用 subprocess.Popen。 awk 和 sort 仅用于说明,因为潜在的回答者可能会让它们进行测试。【参考方案2】:import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
【讨论】:
太棒了!我对其进行了修改以制作一个没有 awk 脚本的独立示例,它使用 sed:sam.nipl.net/code/python/pipeline.py @SamWatkins:您的代码中不需要p1.wait()
。 p1.communicate()
收割子进程。
这个答案不是更pythonic更好吗?它不使用 shell=True
在子流程文档中不鼓励使用。我看不出人们投票支持@S.Lott 答案的原因。
@KenT:shell 解决方案更具可读性且不易出错(如果您不接受不受信任的输入)。 pythonic 解决方案将use plumbum
(the shell syntax embedded in Python) 或另一个接受类似语法(在字符串中)并为您构造管道的模块(无论本地/bin/sh
做什么,都具有相同的行为)。【参考方案3】:
要模拟 shell 管道:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
不调用 shell(参见17.1.4.2. Replacing shell pipeline):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
提供了一些语法糖:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
类比:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
是:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
【讨论】:
Plumbum 看起来很漂亮,但我对“魔法”持谨慎态度。这不是 Perl! Plumbum 看起来不错!我不会担心@KyleStrand 的魔力——快速浏览一下文档,你不需要使用“魔法”位,该模块还有其他方法可以做同样的事情——快速浏览一下代码表明魔法是无害的,实际上非常光滑,一点也不讨厌。 @Tom 我不知道,这是很多运算符重载,具有潜在的令人惊讶的含义。我的一部分喜欢它,但我不愿意在任何地方使用它,除非是在个人项目中。 @KyleStrand:总的来说,我同意你的看法,但实际上,人们更有可能错误地构建命令行(例如,通过忘记pipes.quote()
)或在实现管道时引入错误在 Python 中,even a | b
could be implemented with errors.
@jfs,如果文件通过 POST 请求使用 cat 或
【参考方案4】:
接受的答案是回避这个问题。 这是一个链接多个进程的输出的 sn-p: 请注意,它还会打印(有点)等效的 shell 命令,以便您可以运行它并确保输出正确。
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : ' '.join(cmd1) | ' '.join(cmd2) | ' '.join(cmd3)")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
【讨论】:
如果p*.returncode
返回0,我可以假设没有产生错误吗? @Omry 亚丹
您可以确定它返回 0。“产生的错误”定义不明确。它仍然可以将内容打印到标准错误。
据我所知,如果它是空字符串,我还必须检查 stderr 我可以确定是否生成了错误。
这取决于你所说的错误是什么意思。即使没有错误,某些程序也会定期打印到 stderr。
可以这样死锁吗? cmd1
正在写信给 stderr
并且没有任何东西消耗 p1.stderr
。如果文件缓冲区已满,操作系统将停止执行p1
进程。 p2
也一样。【参考方案5】:
http://www.python.org/doc/2.5.2/lib/node535.html 很好地涵盖了这一点。有没有你不明白的地方?
您的程序将非常相似,但第二个 Popen
将 stdout= 写入文件,并且您不需要其 .communicate()
的输出。
【讨论】:
我不明白(鉴于文档的示例),如果我说 p2.communicate("input data"),它实际上会发送到 p1.stdin 吗? 你不会的。 p1 的 stdin arg 将设置为 PIPE,您将编写 p1.communicate('foo') 然后通过执行 p2.stdout.read() 获取结果 @Leonid - Python 人不太擅长向后兼容。您可以从docs.python.org/2/library/subprocess.html#popen-objects 获得许多相同的信息,但无论如何我已经用回路机器链接替换了该链接。 没有必要询问是否存在“[OP] 不理解的 [the docs] 的某些部分”。如这个问题所示,您发布的文档部分实际上并未解决将输入传递给第一个进程的问题:***.com/q/6341451/1858225【参考方案6】:受@Cristian 回答的启发。我遇到了同样的问题,但使用了不同的命令。所以我把我测试过的例子,我相信这可能会有所帮助:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
这是经过测试的。
做了什么
声明惰性grep
使用管道中的标准输入执行。当管道将被ps
的标准输出填充时,该命令将在ps
命令执行时执行。
调用主命令ps
,并将标准输出定向到grep
命令使用的管道。
Grep 进行通信以从管道中获取标准输出。
我喜欢这种方式,因为它是用subprocess
接口轻轻包裹的自然管道概念。
【讨论】:
为避免僵尸,请在grep_proc.communicate()
之后调用ps_proc.wait()
。 err
始终为 None
,除非您设置了 stderr=subprocess.PIPE
。【参考方案7】:
之前的答案错过了一个重要的点。正如 geocar 所指出的,Replacing shell pipeline 基本上是正确的。 几乎在管道的最后一个元素上运行 communicate
就足够了。
剩下的问题是将输入数据传递给管道。对于多个子进程,最后一个元素上的简单 communicate(input_data)
不起作用 - 它永远挂起。您需要像这样手动创建一个管道和一个子级:
import os
import subprocess
input = """\
input data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", " print $2; "],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
现在子进程通过管道提供输入,父进程调用communicate(),按预期工作。使用这种方法,您可以创建任意长的管道,而无需求助于“将部分工作委托给 shell”。不幸的是,subprocess documentation 没有提到这一点。
有一些方法可以在没有管道的情况下达到同样的效果:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
现在将stdin=tf
用于p_awk
。看你喜欢什么口味了。
上面仍然不是 100% 等效于 bash 管道,因为信号处理不同。如果添加另一个截断sort
输出的管道元素,您可以看到这一点,例如head -n 10
。使用上面的代码,sort
将向stderr
打印“Broken pipe”错误消息。当您在 shell 中运行相同的管道时,您不会看到此消息。 (这是唯一的区别,stdout
中的结果是相同的)。原因似乎是python的Popen
将SIG_IGN
设置为SIGPIPE
,而shell将其留在SIG_DFL
,而sort
的信号处理在这两种情况下是不同的。
【讨论】:
在最后一个进程上运行通信就足够了,看我的回答。【参考方案8】:编辑:pipes
在 Windows 上可用,但至关重要的是,在 Windows 上似乎实际上工作。请参阅下面的 cmets。
Python 标准库现在包含用于处理此问题的 pipes
模块:
https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html
我不确定这个模块已经存在了多久,但这种方法似乎比使用 subprocess
简单得多。
【讨论】:
pipes
甚至在 subprocess
模块之前就已经存在。它构建了一个 (*nix) shell 管道(带有"|"
的字符串,在/bin/sh
中执行)。它不是便携式的。它不是 subprocess
模块的替代品,它是可移植的,不需要启动 shell 来运行命令。 pipes
接口是从 Enterprise JavaBeans 是闪亮的新事物的时候开始的(它不是恭维)。您能否提供pipes
代码示例,它“比子流程简单得多”:check_call('echo "input data" | a | b > outfile.txt', shell=True)
from my answer?
@J.F.Sebastian Huh。你的check_call
命令不应该同样不可移植吗? DOS 提供标准(即*NIX 可比,至少 AFAIK)|
行为,那么您希望 pipes
不能在哪些系统上工作?我承认将check_call
与代表你的shell 命令的字符串一起使用可以说与使用pipes
一样简单,但我希望有一些东西可以促进管道的编程构造,而不是仅仅将单个字符串传递给外壳(你的其他例子)。
正如我在评论中所说,plumbum
看起来确实不错——它似乎提供了我正在寻找的简单性、灵活性和功能。但是,语法完全不透明且非 Pythonic。所以我想要的是与标准 *NIX-shell 管道一样简单和容易的东西(如果可能稍微不那么简洁),同时在语法和风格上仍然“看起来”像 Python。 pipes
,乍一看,肯定满足这些要求;但是,如果您认为它是不可移植的(您可能是)是对的,那么它当然是最不吸引人的选择。
...而且,是的,它看起来像 echo hello world
和 C:\cygwin\bin\tr a-z A-Z
的简单管道在 Windows 上失败,即使 echo hello world | C:\cygwin\bin\tr.exe a-z A-Z
有效。这……奇怪而令人失望。【参考方案9】:
对我来说,下面的方法是最干净和最容易阅读的
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
可以这样调用:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
【讨论】:
以上是关于如何使用 subprocess.Popen 通过管道连接多个进程?的主要内容,如果未能解决你的问题,请参考以下文章
如何从 subprocess.Popen 使用 STDIN [重复]
使用 subprocess.Popen 通过 SSH 或 SCP 发送密码
如何将 Subprocess.popen() 与 pyinstaller 一起使用?
如何在 Windows 上使用带有内置命令的 subprocess.Popen