在 Python 多处理中将 Pool.map 与共享内存数组结合起来

Posted

技术标签:

【中文标题】在 Python 多处理中将 Pool.map 与共享内存数组结合起来【英文标题】:Combine Pool.map with shared memory Array in Python multiprocessing 【发布时间】:2010-12-13 03:26:54 【问题描述】:

我有一个非常大的(只读)数据数组,希望由多个进程并行处理。

我喜欢 Pool.map 函数,并希望使用它来并行计算该数据上的函数。

我看到可以使用ValueArray 类在进程之间使用共享内存数据。但是当我尝试使用它时,我会在使用 Pool.map 函数时得到一个RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance

这是我正在尝试做的一个简化示例:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

谁能告诉我我在这里做错了什么?

所以我想做的是在进程池中创建进程后将有关新创建的共享内存分配数组的信息传递给进程。

【问题讨论】:

不幸的是,这是不可能的。根据 mp 文档,推荐的方法是使用继承(在 fork 平台上)。对于您在此处的只读数据,通常会使用全局,但可以使用共享数组进行读/写通信。分叉很便宜,因此您可以在收到数据时重新创建池,然后关闭它。不幸的是,在 Windows 上这是不可能的 - 解决方法是使用共享内存数组(即使在只读情况下),但这只能在进程创建时传递给子进程(我想它们需要添加到访问列表中... 对于共享内存段,除了在子进程启动时没有实现这个逻辑)。您可以像我展示的那样在 Pool 启动时传递共享数据数组,或者以类似的方式传递给 Process。您不能将共享内存数组传递给打开的池 - 您必须在内存之后创建池。解决此问题的简单方法包括分配最大大小的缓冲区,或者在启动池之前知道所需大小时仅分配数组。如果你保持全局变量在 Windows 上也不会太贵 - 全局变量会自动... 腌制并发送到子进程 - 这就是为什么我建议在开始时制作一个足够大小的缓冲区(希望你的全局变量数量很小),然后池更好。我花时间真诚地理解和解决你的问题 - 在你编辑你的问题之前 - 所以虽然我理解你是否想让它运行,但我希望最后你会考虑接受我的回答,如果没有什么实质性不同/更好的结果沿着。 我仔细查看了源代码,并且可以腌制有关共享内存的信息(需要将有关它的信息传递给 Windows 上的客户端进程),但该代码只有一个断言在进程生成期间运行。我想知道为什么会这样。 【参考方案1】:

如果数据是只读的,只需将其作为模块中的变量来自 Pool 的 fork 之前。那么所有的子进程都应该可以访问它,只要你不写它就不会被复制。

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

如果您确实想尝试使用 Array,尽管您可以尝试使用 lock=False 关键字参数(默认情况下为 true)。

【讨论】:

我不相信使用全局变量是安全的,而且肯定不会在进程没有分叉的窗口上工作。 怎么不安全?如果您只需要读取数据就可以了。如果您错误地写入它,则修改后的页面将为子进程在写入时复制,因此不会发生任何不良情况(例如,不会干扰其他进程)。你是对的,但它不会在 Windows 上工作...... 你说得对,它在基于 fork 的平台上是安全的。但是我想知道在创建进程池后是否有基于共享内存的方式来共享大量数据。【参考方案2】:

我看到的问题是 Pool 不支持通过其参数列表酸洗共享数据。这就是错误消息的含义,即“对象只能通过继承在进程之间共享”。共享数据需要被继承,即如果你想使用 Pool 类共享它是全局的。

如果您需要显式传递它们,您可能必须使用 multiprocessing.Process。这是您修改后的示例:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

输出:('s', 9) ('a', 2) ('b', 3) ('d', 12)

队列元素的顺序可能会有所不同。

为了使其更通用并类似于 Pool,您可以创建固定的 N 个进程,将键列表拆分为 N 个片段,然后使用包装函数作为 Process 目标,它将为每个键调用 count_it在传递的列表中,例如:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

【讨论】:

【参考方案3】:

刚看到赏金就再试一次;)

基本上我认为错误消息的意思是它所说的 - 多处理共享内存数组不能作为参数传递(通过酸洗)。序列化数据没有意义——关键是数据是共享内存。所以你必须使共享数组成为全局的。我认为将它作为模块的属性更简洁,就像在我的第一个答案中一样,但是在您的示例中将其作为全局变量也可以很好地工作。考虑到您不想在分叉之前设置数据的观点,这是一个修改后的示例。如果您想拥有多个可能的共享数组(这就是您想要将 toShare 作为参数传递的原因),您可以类似地创建一个共享数组的全局列表,并将索引传递给 count_it(将变为 for c in toShare[i]:) .

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:由于不使用 fork,以上内容在 Windows 上不起作用。但是,以下在 Windows 上确实有效,仍然使用 Pool,所以我认为这是最接近您想要的:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

不知道为什么 map 不会 Pickle 数组,但 Process 和 Pool 会 - 我想它可能已经在 windows 上的子进程初始化时转移了。请注意,数据仍然是在 fork 之后设置的。

【讨论】:

即使在有 fork 的平台上,也不能在 fork 之后将新的共享数据插入 toShare,因为此时每个进程都有自己的独立副本。 所以真正的问题似乎是我们如何腌制有关数组的信息,以便可以从其他进程发送和连接它。 @James - 不,这是不对的。该数组必须在 fork 之前设置,但它是可以更改的共享内存,所有子节点都可以看到更改。看这个例子 - 我把数据放入数组 after 叉子(当 Pool() 被实例化时发生)。该数据可以在运行时在分叉之后获取,只要它适合预分配的共享内存段,就可以在那里复制并从所有子代中看到。 您可以腌制数组,但不能使用池。 编辑添加工作 Windows 版本,仅使用池(通过将共享数组作为初始化参数传递。【参考方案4】:

如果你看到:

RuntimeError: 同步对象只能通过继承在进程之间共享

考虑使用multiprocessing.Manager,因为它没有这个限制。经理工作时认为它可能完全在一个单独的进程中运行。

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1

【讨论】:

这是我在使用multiprocessing.Pool 时实际得到的唯一建议......而且我不需要明确处理manager.Lock @raphael 您是否断言该值具有隐式锁定?显式锁用于防止竞争条件,从而防止在从多个进程更新计数时出现错误计数。

以上是关于在 Python 多处理中将 Pool.map 与共享内存数组结合起来的主要内容,如果未能解决你的问题,请参考以下文章

CHUNKSIZE无关的多处理在Python / pool.map?

在Python中将多个参数传递给pool.map()函数[重复]

如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组

在集成 Python 的多处理中使用 Pool.map 时,程序运行速度越来越慢

Python - 多进程map的使用方法

python multiprocessing pool.map() 等到方法完成