了解多处理:Python 中的共享内存管理、锁和队列
Posted
技术标签:
【中文标题】了解多处理:Python 中的共享内存管理、锁和队列【英文标题】:Understanding Multiprocessing: Shared Memory Management, Locks and Queues in Python 【发布时间】:2014-01-11 14:50:57 【问题描述】:Multiprocessing是python中一个强大的工具,我想更深入的了解它。 我想知道何时使用 regular Locks 和 Queues 以及何时使用多处理 Manager 在所有进程之间共享这些。
我想出了以下测试场景,其中包含四种不同的多处理条件:
使用池和NO经理
使用池和管理器
使用单独的流程和NO经理
使用单个进程和管理器
工作
所有条件都执行一个作业函数the_job
。 the_job
包含一些由锁固定的打印。此外,函数的输入只是简单地放入一个队列中(看是否可以从队列中恢复)。这个输入只是一个索引idx
,来自range(10)
,在名为start_scenario
的主脚本中创建(显示在底部)。
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
一个条件的成功被定义为完美地召回了输入
从队列中,查看底部的函数read_queue
。
条件
条件 1 和 2 是不言自明的。 条件 1 涉及创建锁和队列,并将它们传递给进程池:
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(帮助函数make_iterator
在这篇文章的底部给出。)
条件 1 失败,RuntimeError: Lock objects should only be shared between processes through inheritance
。
条件 2 很相似,但现在锁和队列都在管理员的监督下:
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
在条件 3 中,手动启动新进程,并且在没有管理器的情况下创建锁和队列:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
条件 4 类似,但现在再次使用经理:
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
在这两种情况下 - 3 和 4 - 我开始一个新的
the_job
的 10 个任务中的每一个的进程,最多 ncores 个进程
同时运行。这是通过以下辅助函数实现的:
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
结果
只有条件 1 失败 (RuntimeError: Lock objects should only be shared between processes through inheritance
) 而其他 3 个条件成功。我试图绕过这个结果。
为什么池需要在所有进程之间共享锁和队列,而条件 3 中的各个进程却不需要?
我所知道的是,对于池条件(1 和 2),来自迭代器的所有数据都通过酸洗传递,而在单进程条件(3 和 4)中,来自迭代器的所有数据都通过从主进程继承来传递(我正在使用 Linux)。
我想在从子进程中更改内存之前,会访问父进程使用的相同内存(写时复制)。但是一旦有人说lock.acquire()
,就应该改变它,并且子进程确实使用放置在内存中其他地方的不同锁,不是吗?一个子进程如何知道兄弟激活了不是通过管理器共享的锁?
最后,有点相关的是我的问题 3 和 4 有多少不同。两者都有单独的流程,但它们在管理器的使用上有所不同。两者都被认为是有效代码吗?或者如果实际上不需要经理,是否应该避免使用经理?
完整脚本
对于那些只想复制和粘贴所有内容来执行代码的人,这里是完整的脚本:
__author__ = 'Me and myself'
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
else:
print 'Scenario %d successful!' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == '__main__':
main()
【问题讨论】:
【参考方案1】:multiprocessing.Lock
是使用操作系统提供的 Semaphore 对象实现的。在 Linux 上,子进程只是通过os.fork
从父进程继承信号量的句柄。这不是信号量的副本;它实际上继承了父级具有的相同句柄,可以继承文件描述符的相同方式。另一方面,Windows 不支持os.fork
,因此它必须腌制Lock
。它通过使用 Windows DuplicateHandle
API 为 multiprocessing.Lock
对象内部使用的 Windows 信号量创建一个重复句柄来实现这一点,该 API 声明:
复制句柄指的是与原始句柄相同的对象。 因此,对对象的任何更改都会通过两者来反映 把手
DuplicateHandle
API 允许您将重复句柄的所有权授予子进程,以便子进程在取消处理后可以实际使用它。通过创建孩子拥有的重复句柄,您可以有效地“共享”锁定对象。
这是multiprocessing/synchronize.py
中的信号量对象
class SemLock(object):
def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
register_after_fork(self, _after_fork)
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__
def __getstate__(self): # This is called when you try to pickle the `Lock`.
assert_spawning(self)
sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): # This is called when unpickling a `Lock`
self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0])
self._make_methods()
注意__getstate__
中的assert_spawning
调用,它在酸洗对象时被调用。以下是它的实现方式:
#
# Check that the current thread is spawning a child process
#
def assert_spawning(self):
if not Popen.thread_is_spawning():
raise RuntimeError(
'%s objects should only be shared between processes'
' through inheritance' % type(self).__name__
)
该函数通过调用thread_is_spawning
确保您“继承”Lock
。在 Linux 上,该方法只返回 False
:
@staticmethod
def thread_is_spawning():
return False
这是因为Linux不需要pickle来继承Lock
,所以如果__getstate__
实际上在Linux上被调用,我们一定不是继承。在 Windows 上,还有更多事情要做:
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
@staticmethod
def thread_is_spawning():
return getattr(Popen._tls, 'process_handle', None) is not None
这里,如果Popen._tls
对象具有process_handle
属性,则thread_is_spawning
返回True
。我们可以看到process_handle
属性是在__init__
中创建的,然后我们想要继承的数据使用dump
从父级传递给子级,然后该属性被删除。所以thread_is_spawning
在__init__
期间只会是True
。根据this python-ideas mailing list thread 的说法,这实际上是为了模拟与Linux 上os.fork
相同的行为而添加的人为限制。 Windows 实际上可以支持随时传递Lock
,因为DuplicateHandle
可以随时运行。
以上所有内容都适用于Queue
对象,因为它在内部使用Lock
。
我会说继承Lock
对象比使用Manager.Lock()
更可取,因为当您使用Manager.Lock
时,您对Lock
的每次调用都必须通过IPC 发送到Manager
进程,这将比使用位于调用进程内的共享Lock
慢得多。不过,这两种方法都完全有效。
最后,可以将Lock
传递给Pool
的所有成员而不使用Manager
,使用initializer
/initargs
关键字参数:
lock = None
def initialize_lock(l):
global lock
lock = l
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
"""
lock = mp.Lock()
mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
queue = mp.Queue()
iterator = make_iterator(args, queue)
mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
mypool.close()
mypool.join()
return read_queue(queue)
这是可行的,因为传递给initargs
的参数被传递给Process
对象的__init__
方法,该对象在Pool
内部运行,因此它们最终被继承,而不是被腌制。
【讨论】:
以上是关于了解多处理:Python 中的共享内存管理、锁和队列的主要内容,如果未能解决你的问题,请参考以下文章
python-- 多进程的基本语法 进程间数据交互与共享进程锁和进程池的使用