我可以在 Pool.imap 调用的函数中使用多处理队列吗?

Posted

技术标签:

【中文标题】我可以在 Pool.imap 调用的函数中使用多处理队列吗?【英文标题】:Can I use a multiprocessing Queue in a function called by Pool.imap? 【发布时间】:2011-04-19 03:07:19 【问题描述】:

我正在使用 python 2.7,并尝试在自己的进程中运行一些 CPU 繁重的任务。我希望能够将消息发送回父进程,以使其了解进程的当前状态。多处理队列似乎很适合这个,但我不知道如何让它工作。

所以,这是我的基本工作示例减去队列的使用。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我尝试以多种方式传递队列,但它们收到错误消息“RuntimeError:队列对象只能通过继承在进程之间共享”。这是我根据我找到的早期答案尝试的方法之一。 (我在尝试使用 Pool.map_async 和 Pool.imap 时遇到同样的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,0 适应度方法(使其全局化)不会生成任何消息,它只是锁定。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道它可能会直接与 multiprocessing.Process 一起使用,并且还有其他库可以实现这一点,但我不想放弃非常适合的标准库函数,直到我确定它不是只是我缺乏知识使我无法利用它们。

谢谢。

【问题讨论】:

你有没有考虑过使用水壶:luispedro.org/software/jug? 【参考方案1】:

诀窍是将队列作为参数传递给初始化程序。似乎适用于所有 Pool 调度方法。

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()

【讨论】:

很好地展示了 initializerinitargs 参数的用途和有用性 multiprocessing.Pool! 你能解释一下,为什么它有效吗?当您执行 f.q = q 时会发生什么? @kepkin 在 Python 中,每个函数都是一个对象(参见 docs.python.org/reference/…Callable Types)。因此,f.q 在函数对象 f 上设置了一个名为 q 的属性。这只是保存 Queue 对象以供以后使用的一种快速且轻量级的方法。 f.q = q 不是猴子补丁的例子吗? ***.com/questions/5626193/what-is-monkey-patch 这让我能够将多处理日志记录模式 (plumberjack.blogspot.com.au/2010/09/…) 应用于异步方法。

以上是关于我可以在 Pool.imap 调用的函数中使用多处理队列吗?的主要内容,如果未能解决你的问题,请参考以下文章

进程池Pool的imap方法解析

Python多处理池:完成任何k个作业后终止进程

函数基础

如何在IDEA中排除多处调用产生的依赖冲突

同一个接口里的数据,多处用小方法

使用 Pyglet 进行多处理会打开一个新窗口