如何在 Python 中使用 multiprocessing.pool 创建全局锁/信号量?

Posted

技术标签:

【中文标题】如何在 Python 中使用 multiprocessing.pool 创建全局锁/信号量?【英文标题】:How to create global lock/semaphore with multiprocessing.pool in Python? 【发布时间】:2015-04-24 05:48:00 【问题描述】:

我想限制子进程中的资源访问。比如-limit http downloadsdisk io等。如何扩展这个基本代码来实现呢?

请分享一些基本的代码示例。

pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
  for job in job_queue.pull_jobs_for_processing:
    pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()

【问题讨论】:

您想限制资源访问的方式是使用LockSemaphore?有什么理由不只使用multiprocessing.Lock / multiprocessing.Semaphore @dano 如何将 multiprocessing.Lock() 或 Semaphore() 传递给池?共享全局锁的选项有哪些? 限制资源访问的需要并不意味着进程池需要任何同步,而是工作任务。你为什么不确切地解释你想要完成什么? @MichaelFoukarakis 为什么?并不重要,重要的是如何?。我可以回答你为什么?因为随机 io 比顺序 io 慢 - 我回答你的问题了吗?请参阅统计数据 - goo.gl/TbC2xp。 Memcache 的工作方式不同于磁盘和硬盘,而不是闪存(它通常被命名为磁盘,但它不是磁盘)或 www 服务器 - 有些需要信号量有些不需要 - 无论我需要什么,都可以像许多其他人一样学习 Python 中多处理的全局信号量模式。 @MichaelFoukarakis WWW 服务器需要信号量以保持礼貌,并且不会拒绝具有大量并行请求的站点 - 它不受设计限制,而是受互联网道德限制。 【参考方案1】:

如果您正在访问资源,请使用全局信号量并获取它。例如:

import multiprocessing
from time import sleep

semaphore = multiprocessing.Semaphore(2)

def do_job(id):
    with semaphore:
        sleep(1)
    print("Finished job")

def main():
    pool = multiprocessing.Pool(6)
    for job_id in range(6):
        print("Starting job")
        pool.apply_async(do_job, [job_id])
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

这个程序每秒只完成两个作业,因为其他线程正在等待信号量。

【讨论】:

【参考方案2】:

在创建池时使用初始化器和 initargs 参数,以便在所有子进程中定义一个全局。

例如:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

此代码将按升序(提交作业的顺序)打印出数字 0-3,因为它使用了锁。注释掉with lock: 行以查看它以降序打印出数字。

此解决方案适用于 windows 和 unix。但是,因为进程可以在 unix 系统上分叉,所以 unix 只需要在模块范围内声明全局变量。子进程获取父进程内存的副本,其中包括仍然有效的锁定对象。因此,初始化程序并不是严格需要的,但它可以帮助记录代码的预期工作方式。当 multiprocessing 能够通过 fork 创建进程时,以下也可以。

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

【讨论】:

我正在研究第二个示例,但看起来 lock = Lock() 不是全局的,因为没有被主人传递 - 我错了吗? 如果第一个例子是同样的问题 - 我会测试它 - 代码看起来不错,但我认为子进程在 Windows 下不会知道父进程。 在第二个例子中,当池创建子进程(在unix上)时,整个父进程的内存被复制到子进程(包括锁对象)。由于您使用的是 windows,请不要使用第二个示例。

以上是关于如何在 Python 中使用 multiprocessing.pool 创建全局锁/信号量?的主要内容,如果未能解决你的问题,请参考以下文章

Python多任务教程

准确确定在 Python 多处理期间腌制的内容

python并发之multiprocessing

python 进程池的使用

使用 Python 多处理进行通信的 OSX 和 Linux 之间的性能差异

初学Python,debug时报错,不知道原因在哪