在函数内多处理 Python 中的 for 循环

Posted

技术标签:

【中文标题】在函数内多处理 Python 中的 for 循环【英文标题】:Multiprocessing a for loop in Python within a function 【发布时间】:2021-09-17 11:39:56 【问题描述】:

这个问题类似于How to use multiprocessing in a for loop - python 和How to use multiprocessing in a for loop - python ,但这些都不能解决我的问题。函数stateRecognizer()使用函数getCoord(imgDir)检查当前屏幕上是否存在一系列图像,并返回相应的状态。

getCoord(key) 返回一个包含 4 个整数的列表。 getCoord(key) 如果未找到图像,则返回 None。

我的for循环实现

checks = "loadingblack.png": 'loading',
          "loading.png": 'loading',
          "gear.png": 'home',
          "factory.png": 'factory',
          "bathtub.png": 'bathtub',
          "refit.png": 'refit',
          "supply.png": 'supply',
          "dock.png": 'dock',
          "spepage.png": 'spepage',
          "oquest.png": 'quest',
          "quest.png": 'quest'

def stateRecognizer(hint=None):
    for key in checks:
       if (getCoord(key) is not None):
           return checks[key]

当我尝试编写另一个函数并调用它时,它没有返回预期的变量:

def stateChecker(key, value):
    if (getCoord(key) is not None):
        return value

def stateRecognizer():
    with Pool(multiprocessing.cpu_count()) as pool:
        result = pool.map(stateChecker, checks)

输出:

stateChecker() missing 1 required positional argument: 'value'

如何将dict 传递给函数stateChecker

更新 2: 谢谢@tdelaney 和@Nathaniel Ford。

def stateChecker(key, value):
    if (getCoord(key) is not None):
        return value
def stateRecognizer():
    with Pool(multiprocessing.cpu_count()) as mp_pool:
        return mp_pool.starmap(stateChecker, checks.items())

该函数现在返回 [None, None, None, None, 'bathtub', None, None, None, None, None, None] 处理速度较慢(大约慢 12 倍)。我假设每个子进程处理每个子进程的整个字典。此外,有时该函数无法正确读取 JPEG 图像。

Premature end of JPEG file
Premature end of JPEG file
[None, None, None, None, None, None, None, None, None, None, None]
Elapsed time: 7.7098618000000005
Premature end of JPEG file
Premature end of JPEG file
[None, None, None, None, 'bathtub', None, None, None, None, None, None]
Elapsed time: 7.169349200000001

*checks.items()checks 之前使用时

    with Pool(multiprocessing.cpu_count()) as mp_pool:
        return mp_pool.starmap(stateChecker, *checks)

引发异常:

Exception has occurred: TypeError
    starmap() takes from 3 to 4 positional arguments but 13 were given

【问题讨论】:

您是否像错误提示的那样使用if __name__ == '__main__' 保护主模块? 感谢您解决了第 2 部分 @flakes 您可能应该解决您遇到的第二个问题,进入它自己的问题。有几件事可能正在发生,但你应该隔离。此外,根据您的具体操作,您可能会遇到 GIL 问题。 感谢您指出 GIL 的概念。 【参考方案1】:

map 使用单个参数调用目标函数。使用starmap 将一个迭代元组解包为目标函数的参数。由于您的函数是为处理键/值对而编写的,因此您可以使用字典的项迭代器来完成这项工作。

def stateChecker(key, value):
    if (getCoord(key) is not None):
        return value
def stateRecognizer():
    with Pool(multiprocessing.cpu_count()) as mp_pool:
        return mp_pool.starmap(stateChecker, checks.items())

【讨论】:

首先感谢您的回答。该函数现在返回 [None, None, None, None, 'bathtub', None, None, None, None, None, None],处理速度较慢(大约慢 16 倍)。我假设每个子进程处理每个子进程的整个字典。 返回值完全是因为getCoord为每个键返回false,而只是没有为bathtub.png返回None。您的问题几乎肯定存在于getCoord。您是否正在尝试做类似actually_appears_on_the_screen() 的事情? getCoord(key) 如果未找到图像,则返回 None。 actually_appears_on_the_screen() 是我想要实现的(在这种情况下它可能返回 True 或 False),但我发现 getCoord(key) 做同样的事情,如果它返回一个坐标,那么在第一个函数的情况下它是 True ,反之亦然。 不,要检查的键被扇出到子进程。每个人都会得到len(checks)/cpu_count() 块来处理。 getCoord(key) 必须返回很多 None 以使所有 None 出现在结果列表中。与原始非并行代码的一个不同之处在于返回第一个非无匹配项,而不是您与池一起返回的列表。多进程有其自身的开销。一项任务是否具有良好的并行性取决于它正在做什么类型或工作。如果这是一个快速或受磁盘限制的操作,多处理会更慢。 感谢您的澄清【参考方案2】:

在 Python 中有一个稍微不常见的行为:

>>> dx = "a": 1, "b": 2
>>> [print(i) for i in dx]
a
b

基本上,这里只有key 值是迭代的一部分。然而,如果我们使用items(),我们会看到:

>>> dx = "a": 1, "b": 2
>>> [print(i) for i in dx]
a
b

当您在池中调用map 时,它实际上是在使用第一个版本。这意味着,不是将键值对传递给stateChecker,而是只传递键。因此,您的错误“缺少 1 个必需的位置参数”。缺少第二个值。

通过使用starmapitems(),我们可以解决这个问题。如上所示,items 将给出一个元组的迭代器(每个元组的一个键值对来自您的字典)。

def stateRecognizer():
    with Pool(multiprocessing.cpu_count()) as mp_pool:
        return mp_pool.starmap(stateChecker, checks.items())

starmap这里指的是使用*操作符:

>>> def f(a, b):
...   print(f"a is the key for b")
... 
>>> my_tuple = ("a", 1)
>>> f(*my_tuple)
a is the key for 1
>>> f(my_tuple)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: f() missing 1 required positional argument: 'b'

正如您在此处看到的,当用于将值传递给函数时,它会“解包”这些值,将元组(或列表)中的每个值放入一个参数中。您可以,当我们使用* 运算符时,我们会收到与您最初收到的错误非常相似的错误。

还有一些注意事项:

在编写 Python 时,最好坚持使用标准命名格式。对于函数,使用蛇形大小写 (state_checker),对于类使用骆驼形大小写。这有助于您更快地推理,以及更深奥的原因。

这个函数可能行为不端:

 def stateChecker(key, value):
     if (getCoord(key) is not None):
         return value

假设getCoord在一个元组中返回四个整数(原文不清楚),它的类型签名是:

def getCoord(key: Any) -> Tuple[int, int, int, int]:
    ....

这意味着,stateChecker 的类型签名又是:

def stateChecker(key: Any, value: Any) -> Union[None, Tuple[int, int, int, int]]:
    ....

在这种情况下,这是因为如果您的 if 子句计算结果为 false,它将返回 None。在这些情况下,getCoord 很可能会短路,但如果不知道更多,很难说是如何发生的。无论如何,您并没有真正处理 None 返回值。

【讨论】:

以上是关于在函数内多处理 Python 中的 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

apply() 函数中的错误,但 for 循环有效

使用 NumPy 从 Python 中的位置向量中没有 for 循环的 One-Hot 编码?

如何在python中实现函数式编程中的嵌套for循环?

Python中的枚举对象有什么用?怎样用内置函数enumerate()得到枚举对象?Python的for循环和C++的for循环有何区别?Python中for循环的本质是什么?

python中的for循环(基本)

python之for循环与range()函数