Python中主进程和子进程之间的动态通信
Posted
技术标签:
【中文标题】Python中主进程和子进程之间的动态通信【英文标题】:Dynamic communication between main and subprocess in Python 【发布时间】:2019-01-06 09:35:36 【问题描述】:我在 Python 中工作,我想找到一个工作流来使两个进程(main-process 和 sub-process)能够相互通信。我的意思是 main-process 向 sub-process 发送一些数据的能力(也许,通过写入 sub-process's stdin ) 以及 子进程 将一些数据发送回主进程的能力。这也意味着两者都可以读取发送给他们的数据(我正在考虑从标准输入读取)。
我试图使用 subprocess 库,但它似乎旨在与旨在仅提供一次输出然后终止的进程一起使用,而我想 交换数据动态,只有在收到这样的命令时才关闭子进程。
我在这里阅读了很多关于 *** 解决与我密切相关的问题的答案,但没有一个让我感到满意,因为这些答案本应与我的问题在一个重要细节上有所不同:我需要我的 主进程能够交换数据与其子进程根据需要动态多次,而不仅仅是一次,这反过来意味着 子进程 应该运行,直到它从 主进程 接收到某个命令以终止。
我对使用第三方库持开放态度,但如果您提出仅基于 Python 标准库的解决方案会更好。
【问题讨论】:
IPC with a Python subprocess的可能重复 @Omer 不,不是。首先,您所指的问题中的任务更具体,因为子流程的标准流不能被使用,因为它们是为其他东西保留的。其次,我要求动态交换数据,而据我所知,那里提出的实现产生的结果与我正在寻找的结果相反。 自从我使用管道以来已经有一段时间了,但是 iirc 它们允许您在进程之间动态交换数据,这是一个非 Python 的解释:tldp.org/LDP/lpg/node7.html 很确定链接的线程谈到了类似的事情,请尝试重新-检查它(虽然我可能错了) 听起来您仍然在为两个 Python 进程寻找好的旧 IPC...。其中:a)有点使问题不是真正的Python特定的; b) 在选择最合适的 IPC 机制方面缺少一些位(在查看如何在 Python 中使用它之前);我们知道这些流程预计会在更长的时间内交换数据。这种通信是双向的吗?子流程是否只希望与“主”流程一样长,或者它实际上可以衍生出自己的生命?传递信息的性质?... @OndrejK。抱歉,如果我的问题中的某些内容对您来说似乎含糊不清,尽管我认为我尽可能清楚地说明了这一点:我需要主进程能够生成子进程并向其发送数据(它可以是纯文本)同时还能够从中接收消息(也可以是文本);子进程应该运行直到主进程说它终止(或者,也许,直到主进程杀死它)。感谢您与我们联系。 【参考方案1】:您想用subprocess.PIPE
为标准输入和输出创建一个Popen
对象,并使用它的文件对象进行通信——而不是使用像run
这样的cantrips(以及旧的,更具体的,如check_output
)。挑战在于避免死锁:很容易陷入每个进程都在尝试写入、管道缓冲区填满(因为没有人从它们读取)以及一切都挂起的情况。您还必须记住在两个进程中都使用 flush
,以避免请求或响应卡在 file
对象的缓冲区中。
Popen.communicate
用于避免这些问题,但它仅支持单个字符串(而不是正在进行的对话)。传统的解决方案是select
,但它也可以使用单独的线程来发送请求和读取结果。 (尽管有 GIL,这是使用 CPython 线程的原因之一:每个线程都存在以运行而另一个被阻塞,因此几乎没有争用。)当然,同步是一个问题,并且您可能需要做一些工作以使多线程客户端在外部表现得像一个简单的同步函数调用。
注意两个进程都需要flush
,但如果其中一个实现这样的非阻塞I/O就足够了;一个通常在启动另一个的过程中完成这项工作,因为这是众所周知的必要条件(此类程序是例外)。
【讨论】:
感谢您的回复!实际上,我尝试使用 Popen,但是communicate()——正如你所指出的——只支持一个字符串,这就是我拒绝这个想法的原因,尽管我没有尝试以不同的方式使用 Popen(只是阅读/从标准写入而不是使用通信())。我应该试试这个吗?另外,我正在考虑使用 Asyncio 标准库来解决死锁问题。你有什么想法吗? @AntonReient:communicate
是一种专门提供的工具,因为它可以让人们解决一个常见问题而没有任何(感知的)复杂性。我的回答是使用Popen
流,对吗?【参考方案2】:
看起来管道可能是您的用例的合适选择。请注意,在正常情况下,读取和写入端都希望分别写入或读取数据。还要确保您不会对缓冲感到惊讶(没有任何反应,因为缓冲区不会自动刷新,除非在预期的边界上,除非相应设置)。
如何在两个进程之间使用两个管道(它们是单向的)的基本示例:
import os
def child():
"""This function is executed in a child process."""
infile = os.fdopen(r1)
outfile = os.fdopen(w2, 'w', buffering=1)
for line in infile:
if line.rstrip() == 'quit':
break
print(line.upper(), end='', file=outfile)
def parent():
"""This function is executed in a parent process."""
outfile = os.fdopen(w1, 'w', buffering=1)
infile = os.fdopen(r2)
print('Foo', file=outfile)
print(infile.readline(), end='')
print('bar', file=outfile)
print(infile.readline(), end='')
print('quit', file=outfile)
(r1, w1) = os.pipe() # for parent -> child writes
(r2, w2) = os.pipe() # for child -> parent writes
pid = os.fork()
if pid == 0:
child() # child code runs here
elif pid > 0:
parent() # parent code runs here
os.waitpid(pid, 0) # wait for child
else:
raise RuntimeError("This should not have happened.")
确实,使用subprocess
会更容易、更实用,而且您可能想运行另一个程序。前者需要被告知不要关闭管道文件描述符,后者需要管道文件描述符是可继承的(没有设置O_CLOEXEC
标志)。
儿童节目:
import os
import sys
infile = os.fdopen(int(sys.argv[1]))
outfile = os.fdopen(int(sys.argv[2]), 'w', buffering=1)
for line in infile:
if line.rstrip() == 'quit':
break
print(line.upper(), end='', file=outfile)
父程序:
import os
import subprocess
(r1, w1) = os.pipe2(0) # for parent -> child writes
(r2, w2) = os.pipe2(0) # for child -> parent writes
child = subprocess.Popen(['./child.py', str(r1), str(w2)], pass_fds=(r1, w2))
outfile = os.fdopen(w1, 'w', buffering=1)
infile = os.fdopen(r2)
print('Foo', file=outfile)
print(infile.readline(), end='')
print('bar', file=outfile)
print(infile.readline(), end='')
print('quit', file=outfile)
child.wait()
如果子程序既不需要标准输入也不需要标准输出,它们可以用来分别获取子程序输入和输出的信息。这甚至会更简单。
儿童节目:
import sys
for line in sys.stdin:
if line.rstrip() == 'quit':
break
print(line.upper(), end='', flush=True)
父程序:
import os
import subprocess
(r1, w1) = os.pipe2(0) # for parent -> child writes
(r2, w2) = os.pipe2(0) # for child -> parent writes
child = subprocess.Popen(['./child.py'], stdin=r1, stdout=w2)
outfile = os.fdopen(w1, 'w', buffering=1)
infile = os.fdopen(r2)
print('Foo', file=outfile)
print(infile.readline(), end='')
print('bar', file=outfile)
print(infile.readline(), end='')
print('quit', file=outfile)
child.wait()
如上所述,它并不是真正的 Python 特定的,这些只是关于如何使用管道作为一种选项的粗略提示。
【讨论】:
这仍然存在死锁(直到事务在实际应用程序中变得更大时才可见)以及刷新的关键问题。 @DavisHerring 你使用过 Asyncio 标准库吗?如果有,你觉得会有帮助吗?我理解这个实现带来的死锁问题。例如,NodeJs 通过使用以非阻塞方式工作的侦听器(异步事件驱动架构)来解决它。在 Python 中实现这样的东西会很酷。想法? @DavisHerring 我真的只是想了解基础知识。至于死锁的可能性:是的,父母/孩子不能盲目地假设有人在听,更不用说一次写字了。刷新:应该在所有示例中进行行缓冲,并在单个消息或缓冲区已满时刷新(我假设为 4k)。 @OndrejK.:确实某些合理的协议可以防止死锁;我应该这么说的。然而,行缓冲仅在终端上是自动的。一个可以使用伪终端来实现它(cf.unbuffer
),但这比改变服务器程序(如果可以的话)要复杂得多。跨度>
该示例使用更高级别的 (python) 接口通过io.TextIOWrapper 读取/写入(以文本模式打开类似文件的对象)。 “如果 line_buffering 为 True,则当 write 调用包含换行符或回车时,会隐含 flush()。” 我并不完全不同意你的观点,但我也试图保持复杂性下来。以上是关于Python中主进程和子进程之间的动态通信的主要内容,如果未能解决你的问题,请参考以下文章