具有多处理锁的共享计数器在 Windows 中不起作用

Posted

技术标签:

【中文标题】具有多处理锁的共享计数器在 Windows 中不起作用【英文标题】:Shared counter with multiprocessing Lock is not working in Windows 【发布时间】:2021-11-28 04:13:48 【问题描述】:

我想在multiprocessing.Pool 中有一个共享计数器,我使用以下代码打印不同的输入列表:

import multiprocessing

running = multiprocessing.Value('i', 0)

def f(x):
    global running
    global lock

    # ... code ...

    with lock:
        running.value -= 1
        print(f"Still running: running.value\n", end='', flush=True)

    return x

if __name__ == '__main__':
    lock = multiprocessing.Lock()

    rangeval = range(100)
    running.value = len(rangeval)

    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = pool.map(f, iterable=rangeval)

这在 Mac 和 Linux 中运行良好。但是当我在 Windows 中运行它时,它会产生一个错误:

  File "C:\...\...\...\...\main.py", line 11, in f
    with lock:
       NameError: name 'lock' is not defined

当我将lock = multiprocessing.Lock() 放在if __name__ == '__main__' 之外的函数f 之上时,它会产生如下奇怪的输出:

Still running: -1
Still running: -2
Still running: -3
Still running: -4
Still running: -1
Still running: -2
Still running: -3
Still running: -4

如何在 Windows 中解决这个问题?

【问题讨论】:

您尝试过lock = None 外主要吗?然后你将它分配到 main. 是的,它在函数f 中再次产生错误,如with lock: AttributeError: __enter__。我把lock = None放在函数f之前 【参考方案1】:

这可以通过调用在 macOS 和 Linux 上不起作用

multiprocessing.set_start_method("spawn", force=True)

(在这些操作系统上,默认值可能是fork。)

您不需要单独的锁; Values have a lock of their own.

您需要跳过一些环节才能在子进程初始化时正确地将共享内存值移动到子进程中。 (灵感来自this answer。)

import multiprocessing

# multiprocessing.set_start_method("spawn", force=True)

running: multiprocessing.Value  # assigned in initproc


def f(x):
    with running.get_lock():
        running.value -= 1
        print(f"Still running: running.value\n", end="", flush=True)

    return x


def initproc(r):
    global running
    running = r


def main():
    running = multiprocessing.Value("i", 0)
    rangeval = range(10)
    running.value = len(rangeval)

    with multiprocessing.Pool(
        processes=multiprocessing.cpu_count(), initializer=initproc, initargs=(running,)
    ) as pool:
        pool.map(f, iterable=rangeval)


if __name__ == "__main__":
    main()

【讨论】:

【参考方案2】:

需要将 multiprocessing.Value 对象传递给子进程。下面是一个简单的例子,可以帮助你理解它的用法:

from multiprocessing import Process, Value


def p1(v):
    with v.get_lock():
        v.value += 1


def p2(v):
    with v.get_lock():
        v.value -= 1


if __name__ == '__main__':
    v = Value('i', 0)
    plist = []
    for _ in range(10):
        for p in [p1, p2]:
            _p = Process(target=p, args=[v])
            plist.append(_p)
            _p.start()
    for p in plist:
        p.join()
    assert v.value == 0

这里我们创建了一个初始化为零的 Value 对象。我们有两个函数将作为子进程运行。 p1 将增加值,p2 将减少它。我们每个 p1 和 p2 运行 10 个(有效的)并发实例。换句话说,该值将增加 10 倍并减少 10 倍最终为零

【讨论】:

以上是关于具有多处理锁的共享计数器在 Windows 中不起作用的主要内容,如果未能解决你的问题,请参考以下文章

如果不存在在 Windows 批处理中不起作用

Python 多处理和共享计数器

多处理在python web-scraping中不起作用

Astyle 在 Windows 中不起作用

Toast 通知在 Xamarin UWP Windows 应用程序中不起作用

Windows 身份验证在 Flex 中不起作用