如何加快与子进程的通信
Posted
技术标签:
【中文标题】如何加快与子进程的通信【英文标题】:How to speed up communication with subprocesses 【发布时间】:2014-08-03 15:24:00 【问题描述】:我正在使用 Python 2 subprocess
和 threading
线程来获取标准输入,使用二进制文件 A
、B
和 C
对其进行处理,并将修改后的数据写入标准输出。
这个脚本(我们称之为:A_to_C.py
)非常慢,我想学习如何修复它。
大致流程如下:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
这个想法是标准输入进入A_to_C.py
:
A
二进制文件处理一大块标准输入并使用函数produceA
创建A
-输出。
B
二进制文件处理A
的标准输出块,并通过函数produceB
创建B
-输出。
C
二进制文件通过函数produceC
处理B
的标准输出块,并将C
-output 写入标准输出。
我使用 cProfile 进行了分析,并且该脚本中的几乎所有时间似乎都花在了获取线程锁上。
例如,在测试 417s 作业中,416s(>99% 的总运行时间)用于获取线程锁:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 method 'acquire' of 'thread.lock' objects
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 posix.access
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
我的threading.Thread
和/或subprocess.Popen
安排有什么问题导致此问题?
【问题讨论】:
这可能是由.join()
调用引起的,如here 所写
您是否尝试过将代码纯粹作为子进程编写,没有线程?
您误读了分析器输出。 cProfile 发现您的 主线程 必须等待 .join()
调用完成。 416.424
的总时间只是主线程等待工作线程完成工作。您的程序的其余部分几乎没有工作,因为您受 I/O 限制(例如,等待外部进程完成它们的工作)。 这是怎么回事?
问题是,如果我用A|B|C
和awk
来模拟生产功能所做的一些事情,那么完成任务所需的时间就会大大减少。这种基于 Python 的线程方法似乎比基于 shell 或其他方法消耗更多的时间。问题是 Python 脚本提供了一些使用基于 shell 的方法难以模拟的功能,但性能使脚本无法使用;我的目标是弄清楚为什么这会花费这么多时间,或者找到一种替代方法来管理标准 I/O 流。
你能发布完整的代码吗?这是缺少produceA
produceB
produceC
函数。 (或者至少,用通过的虚拟函数来展示它)
【参考方案1】:
我认为您只是被 cProfile 的工作方式误导了。例如,这是一个使用两个线程的简单脚本:
#!/usr/bin/python
import threading
import time
def f():
time.sleep(10)
def main():
t = threading.Thread(target=f)
t.start()
t.join()
如果我使用 cProfile 对此进行测试,我会得到以下结果:
>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
60 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 <string>:1(<module>)
1 0.000 0.000 10.011 10.011 test.py:10(main)
1 0.000 0.000 0.000 0.000 threading.py:1008(daemon)
2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:241(Condition)
2 0.000 0.000 0.000 0.000 threading.py:259(__init__)
2 0.000 0.000 0.000 0.000 threading.py:293(_release_save)
2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restore)
2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned)
2 0.000 0.000 10.011 5.005 threading.py:308(wait)
1 0.000 0.000 0.000 0.000 threading.py:541(Event)
1 0.000 0.000 0.000 0.000 threading.py:560(__init__)
2 0.000 0.000 0.000 0.000 threading.py:569(isSet)
4 0.000 0.000 0.000 0.000 threading.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:602(wait)
1 0.000 0.000 0.000 0.000 threading.py:627(_newname)
5 0.000 0.000 0.000 0.000 threading.py:63(_note)
1 0.000 0.000 0.000 0.000 threading.py:656(__init__)
1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:726(start)
1 0.000 0.000 10.010 10.010 threading.py:911(join)
10 10.010 1.001 10.010 1.001 method 'acquire' of 'thread.lock' objects
2 0.000 0.000 0.000 0.000 method 'append' of 'list' objects
1 0.000 0.000 0.000 0.000 method 'disable' of '_lsprof.Profiler' objects
4 0.000 0.000 0.000 0.000 method 'release' of 'thread.lock' objects
4 0.000 0.000 0.000 0.000 thread.allocate_lock
2 0.000 0.000 0.000 0.000 thread.get_ident
1 0.000 0.000 0.000 0.000 thread.start_new_thread
如您所见,它表明几乎所有时间都花在获取锁上。当然,我们知道这并不是脚本在做什么的准确表示。实际上,所有时间都花在了f()
内部的time.sleep
调用上。 acquire
调用的高 tottime
只是因为 join
正在等待 f
完成,这意味着它必须坐下来等待获取锁。但是,cProfile 根本没有显示在f
中花费的任何时间。我们可以清楚地看到实际发生了什么,因为示例代码非常简单,但是在更复杂的程序中,这个输出非常具有误导性。
您可以通过使用另一个分析库获得更可靠的结果,例如yappi:
>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()
Clock type: wall
Ordered by: totaltime, desc
name #n tsub ttot tavg
<stdin>:1 <module> 2/1 0.000025 10.00801 5.004003
test.py:10 main 1 0.000060 10.00798 10.00798
..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731
..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706
..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682
test.py:6 f 1 0.000013 10.00680 10.00680
..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608
..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484
..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250
..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121
..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101
..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047
..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090
..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090
..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035
..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034
..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068
..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013
../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035
..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016
..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003
..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009
..g.py:296 _Condition._acquire_restore 2 0.000011 0.000017 0.000008
../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014
..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003
..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004
..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002
使用yappi
,更容易看出时间花在f
上。
我怀疑您会发现,实际上,您脚本的大部分时间都花在了 produceA
、produceB
和 produceC
中正在完成的任何工作上。
【讨论】:
“我怀疑您会发现,实际上,您脚本的大部分时间都花在了生产 A、生产 B 和生产 C 中正在完成的任何工作上。” - 这是一个很好的假设,然而,它恰好不是真的。 OP 脚本中的 CPU 时间总量(在一项特定测试中)约为 6 秒,但运行挂钟时间需要约 14 秒(仅传输数据需要 出乎意料的慢。 嗨@dano,我如何检索前 10 个耗时的方法?我尝试了 yappi.get_func_stats(yappi.SORTTYPE_TSUB, yappi.SORTORDER_DESC, 10).print_all() 但我得到一个错误 yappi.SORTTYPE_TSUB 不存在【参考方案2】:TL;DR 如果您的程序运行速度比预期慢,这可能是由于中间函数所做的细节而不是由于 IPC 或线程。使用模拟函数和进程(尽可能简单)进行测试,以隔离将数据传入/传出子进程的开销。在基于您的代码(如下)的基准测试中,向/从子进程传递数据时的性能似乎大致相当于直接使用 shell 管道; python 在这个任务上并不是特别慢。
原始代码是怎么回事
原代码的一般形式是:
def produceB(from_stream, to_stream):
while True:
buf = from_stream.read()
processed_buf = do_expensive_calculation(buf)
to_stream.write(processed_buf)
这里读取和写入之间的计算大约占所有进程(主进程和子进程)总 cpu 时间的 2/3 - 这是 cpu 时间,而不是挂钟时间。
我认为这会阻止 I/O 全速运行。读写和计算都需要有自己的线程,queues 提供读取和计算之间以及计算和写入之间的缓冲(因为管道提供的缓冲量我相信是不够的)。
我在下面显示,如果在读写之间没有处理(或等效地:如果中间处理在单独的线程中完成),那么线程+子进程的吞吐量非常高。也可以有单独的线程用于读取和写入;这增加了一些开销,但使写入不会阻塞读取,反之亦然。三个线程(读取、写入和处理)甚至更好,然后任何步骤都不会阻塞其他步骤(当然,在队列大小的限制内)。
一些基准测试
以下所有基准测试均基于 Ubuntu 14.04LTS 64 位(Intel i7、Ivy Bridge、四核)上的 python 2.7.6。测试是在两个dd
进程之间以4KB 的块传输大约1GB 的数据,并通过python 作为中介传递数据。 dd 进程使用中等大小(4KB)的块;典型的文本 I/O 会更小(除非它被解释器巧妙地缓冲等),典型的二进制 I/O 当然会大得多。我有一个基于您是如何做到这一点的示例,还有一个基于我前一段时间尝试过的另一种方法的示例(结果速度较慢)。顺便说一句,感谢您发布这个问题,它很有用。
线程和阻塞 I/O
首先,让我们将问题中的原始代码转换为一个稍微简单的自包含示例。这只是两个进程与一个线程通信,将数据从一个泵送到另一个,执行阻塞读取和写入。
import subprocess, threading
A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def convert_A_to_B(src, dst):
read_size = 8*1024
while True:
try:
buf = src.read(read_size)
if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed
break
dst.write(buf)
except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
print str(e)
break
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))
convert_A_to_B_thread.start()
# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()
A_process.wait()
B_process.stdin.close()
B_process.wait()
结果:
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s
real 0m0.678s
user 0m0.657s
sys 0m1.273s
还不错!事实证明,在这种情况下,理想的读取大小约为 8k-16KB,小得多和大得多的大小会稍微慢一些。这可能与我们要求 dd 使用的 4KB 块大小有关。
选择和非阻塞 I/O
之前在看这类问题的时候,我朝着使用select()
、非阻塞I/O和单线程的方向发展。我的问题就是一个例子:How to read and write from subprocesses asynchronously?。那是为了并行读取两个进程,我在下面将其扩展到从一个进程读取并写入另一个进程。非阻塞写入的大小限制为 PIPE_BUF 或更小,在我的系统上为 4KB;为简单起见,读取也是 4KB,尽管它们可以是任意大小。这有一些奇怪的极端情况(以及莫名其妙的挂起,具体取决于细节),但在下面的表格中它可以可靠地工作。
import subprocess, select, fcntl, os, sys
p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)
print "PIPE_BUF = %d" % (select.PIPE_BUF)
read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[])
for fd in inputready:
if fd == p1.stdout.fileno():
if len(bufs) < max_buf_len:
data = p1.stdout.read(read_size)
bufs.append(data)
for fd in outputready:
if fd == p2.stdin.fileno() and len(bufs) > 0:
data = bufs.pop(0)
p2.stdin.write(data)
p1.poll()
# If the first process is done and there is nothing more to write out
if p1.returncode != None and len(bufs) == 0:
# Again cleanup is tricky. We expect the second process to finish soon after its input is closed
p2.stdin.close()
p2.wait()
p1.wait()
break
结果:
PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s
real 0m3.167s
user 0m2.719s
sys 0m2.373s
然而,这比上面的版本要慢得多(即使读取/写入大小都是 4KB 以进行苹果对苹果的比较)。我不知道为什么。
附:后期添加:似乎可以忽略或超过PIPE_BUF。这会导致大部分时间从p2.stdin.write()
抛出 IOError 异常(errno=11,暂时不可用),大概是当管道中有足够的空间来写一些东西,但小于我们请求的完整大小时。上面与read_size = 64*1024
相同的代码,在捕获并忽略该异常的情况下,以 1.4GB/s 的速度运行。
直接管道
作为基线,使用 shell 版本的管道(在子进程中)运行它有多快?让我们看看:
import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
结果:
244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s
real 0m0.466s
user 0m0.300s
sys 0m0.590s
这明显比线程化 python 示例快。但是,这只是一份副本,而线程化的 python 版本正在做两份(进出 python)。修改命令为"dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"
,性能达到1.6GB,与python示例一致。
如何在一个完整的系统中进行比较
关于如何在完整系统中进行比较的一些其他想法。再次为简单起见,这只是两个进程,两个脚本具有完全相同的convert_A_to_B()
函数。
脚本1:在python中传递数据,如上
A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
脚本2:比较脚本,在shell中传递数据
convert_A_to_B(sys.stdin, sys.stdout)
在 shell 中运行它:A | python script_2.py | B
这允许在完整系统中进行同类比较,而无需使用模拟函数/进程。
块读取大小如何影响结果
对于这个测试,使用上面第一个(线程)示例中的代码,dd
和 python 脚本都设置为使用相同的块大小读取/写入。
| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
理论上,更大的缓冲区应该有更好的性能(可能达到缓存效果),但实际上,Linux 管道在使用非常大的缓冲区时会减慢速度,即使使用纯 shell 管道也是如此。
【讨论】:
有趣的是,我的convert_A_to_B
等效函数与您的几乎相同。它以 4 kB 的块读取数据。我将读取大小从 4 kB 更改为 8 kB,但这对我的测试结果没有影响。
@AlexReynolds:我应该澄清一下,只有当两个 dd 进程都设置为使用 4KB 时,8-16KB 才是最佳选择。当使用一系列尺寸时,我在上面添加了一个比较表。
@AlexReynolds:您能否使用我在答案最后部分概述的方法将您的代码与普通的 shell 管道进行比较?可能您已经以与壳管相同的速度运行。如果您想要一种比管道更快的方法,我可以将一些东西放在一起(例如,mmap 应该快得多)。但我的理论是,你已经接近壳管速度,由于在子进程中做事,开销最小。
@AlexReynolds:您在另一条评论中提到您的测试在 68 或 41 分钟内运行。它传输多少数据? (在这段时间内,管道可以做几 TB,共享内存可以做几十 TB)你能发布一个小的、完整的可运行示例(包括外部程序和转换函数)吗?在无法实际运行测试的情况下,很难想出任何方法来加速它。【参考方案3】:
您对 subprocess.Popen() 的调用隐式指定了 bufsize 的默认值 0,这会强制进行无缓冲 I/O。尝试添加一个合理的缓冲区大小(4K、16K,甚至 1M),看看是否有什么不同。
【讨论】:
通过添加bufsize=-1
,我能够将样本输入的处理时间从 68 分钟缩短到 41 分钟。这绝对是一种改进——所以到目前为止,你是领先者,任何使用subprocess.Popen()
的人都应该知道bufsize
参数——但最好的解决方案(我们不拥有的解决方案,我的代码库涉及,并用 C++ 编写)在 4.5 分钟内完成这项工作,所以我还有一些路要走。我可能会开始研究 C 中的 popen()
和 pthreads
。【参考方案4】:
既然您在 cmets 中谈到了 popen()
和 pthreads
,我猜您是在 POSIX 系统下(可能是 Linux)。
那么您是否尝试使用subprocess32
而不是标准的subprocess
库。
documentation 强烈鼓励使用它,并且可能会带来一些改进。
PS:我相信混合分叉 (subprocess
) 和线程是 a bad idea。
PS2:为什么python produceA.py | A | python produceB.py | B | python produceC.py | C
不符合您的需求?还是使用subprocess
的等价物?
【讨论】:
我追求的是单一脚本方法。 @Lithy:“混合分叉(子进程)和线程是一个坏主意” - 但是分叉所做的第一件事就是调用 exec(或者更确切地说是 execve),它会忘记所有可能拥有的信息无论如何都受到互斥锁的保护。不,混合 forks+exec 和线程就可以了。【参考方案5】:这种情况特别适合管道,其中并行性由操作系统隐式管理。由于您追求的是单一脚本解决方案,因此您可以:
#! /usr/bin/python2
import sys
import subprocess
import pipes
# Define these as needed
def produceA(input, output):
output.write(input.read())
def produceB(input, output):
output.write(input.read())
def produceC(input, output):
output.write(input.read())
# Magic starts here
COMMAND = "me prepare_A | A - | me A_to_B | B - | me B_to_C | C -"
def bootstrap(input, output):
"""Prepares and runs the pipeline."""
me = "./".format(pipes.quote(__file__))
subprocess.call(
COMMAND.format(me=me),
stdin=input, stdout=output, shell=True, bufsize=-1
)
if __name__ == '__main__':
ACTIONS =
"prepare_A": produceA,
"A_to_B": produceB,
"B_to_C": produceC
action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap
action(sys.stdin, sys.stdout)
此脚本将设置管道或运行produce
函数之一,具体取决于指定的命令。
使其可执行并在不带参数的情况下运行它:
./A_to_C.py < A.txt > C.txt
注意:您似乎使用的是 Python 2.6,因此此解决方案适用于 Python 2.x,尽管它应该在 Python 3.x 中运行良好,除了 quote
函数已移至 shlex
Python 3.3
【讨论】:
“多线程并没有带来任何可感知的优势(每个进程/线程都在等待来自其前身的一些输入)” - 说什么?管道中的每一步都可以同时运行。在管道 A->B->C 中,C 可以在单元 1 上工作(最早进入的),而 B 在 2 上工作,A 在 3 上工作。接下来,C 在 2 上工作,B 在 3 上工作,等等......如果这些步骤花费的时间大致相同,那么具有足够内核的机器上的吞吐量会增加三倍。这怎么不是优势? 没有。除非您有缓冲区或异步写入例程,否则如果 B 忙于计算(即不读取),A 将阻塞直到它可以写入,所以直到 B 读取。不管怎样,这不是重点,所以我删除了它。以上是关于如何加快与子进程的通信的主要内容,如果未能解决你的问题,请参考以下文章