在函数内多处理 Python 中的 for 循环
Posted
技术标签:
【中文标题】在函数内多处理 Python 中的 for 循环【英文标题】:Multiprocessing a for loop in Python within a function 【发布时间】:2021-09-17 11:39:56 【问题描述】:这个问题类似于How to use multiprocessing in a for loop - python
和How to use multiprocessing in a for loop - python
,但这些都不能解决我的问题。函数stateRecognizer()
使用函数getCoord(imgDir)
检查当前屏幕上是否存在一系列图像,并返回相应的状态。
getCoord(key)
返回一个包含 4 个整数的列表。 getCoord(key)
如果未找到图像,则返回 None。
我的for循环实现
checks = "loadingblack.png": 'loading',
"loading.png": 'loading',
"gear.png": 'home',
"factory.png": 'factory',
"bathtub.png": 'bathtub',
"refit.png": 'refit',
"supply.png": 'supply',
"dock.png": 'dock',
"spepage.png": 'spepage',
"oquest.png": 'quest',
"quest.png": 'quest'
def stateRecognizer(hint=None):
for key in checks:
if (getCoord(key) is not None):
return checks[key]
当我尝试编写另一个函数并调用它时,它没有返回预期的变量:
def stateChecker(key, value):
if (getCoord(key) is not None):
return value
def stateRecognizer():
with Pool(multiprocessing.cpu_count()) as pool:
result = pool.map(stateChecker, checks)
输出:
stateChecker() missing 1 required positional argument: 'value'
如何将dict
传递给函数stateChecker
?
更新 2: 谢谢@tdelaney 和@Nathaniel Ford。
def stateChecker(key, value):
if (getCoord(key) is not None):
return value
def stateRecognizer():
with Pool(multiprocessing.cpu_count()) as mp_pool:
return mp_pool.starmap(stateChecker, checks.items())
该函数现在返回 [None, None, None, None, 'bathtub', None, None, None, None, None, None] 处理速度较慢(大约慢 12 倍)。我假设每个子进程处理每个子进程的整个字典。此外,有时该函数无法正确读取 JPEG 图像。
Premature end of JPEG file
Premature end of JPEG file
[None, None, None, None, None, None, None, None, None, None, None]
Elapsed time: 7.7098618000000005
Premature end of JPEG file
Premature end of JPEG file
[None, None, None, None, 'bathtub', None, None, None, None, None, None]
Elapsed time: 7.169349200000001
当*
在checks.items()
或checks
之前使用时
with Pool(multiprocessing.cpu_count()) as mp_pool:
return mp_pool.starmap(stateChecker, *checks)
引发异常:
Exception has occurred: TypeError
starmap() takes from 3 to 4 positional arguments but 13 were given
【问题讨论】:
您是否像错误提示的那样使用if __name__ == '__main__'
保护主模块?
感谢您解决了第 2 部分 @flakes
您可能应该解决您遇到的第二个问题,进入它自己的问题。有几件事可能正在发生,但你应该隔离。此外,根据您的具体操作,您可能会遇到 GIL 问题。
感谢您指出 GIL 的概念。
【参考方案1】:
map
使用单个参数调用目标函数。使用starmap
将一个迭代元组解包为目标函数的参数。由于您的函数是为处理键/值对而编写的,因此您可以使用字典的项迭代器来完成这项工作。
def stateChecker(key, value):
if (getCoord(key) is not None):
return value
def stateRecognizer():
with Pool(multiprocessing.cpu_count()) as mp_pool:
return mp_pool.starmap(stateChecker, checks.items())
【讨论】:
首先感谢您的回答。该函数现在返回[None, None, None, None, 'bathtub', None, None, None, None, None, None]
,处理速度较慢(大约慢 16 倍)。我假设每个子进程处理每个子进程的整个字典。
返回值完全是因为getCoord
为每个键返回false
,而只是没有为bathtub.png
返回None
。您的问题几乎肯定存在于getCoord
。您是否正在尝试做类似actually_appears_on_the_screen()
的事情?
getCoord(key)
如果未找到图像,则返回 None。 actually_appears_on_the_screen()
是我想要实现的(在这种情况下它可能返回 True 或 False),但我发现 getCoord(key)
做同样的事情,如果它返回一个坐标,那么在第一个函数的情况下它是 True ,反之亦然。
不,要检查的键被扇出到子进程。每个人都会得到len(checks)/cpu_count()
块来处理。 getCoord(key)
必须返回很多 None
以使所有 None
出现在结果列表中。与原始非并行代码的一个不同之处在于返回第一个非无匹配项,而不是您与池一起返回的列表。多进程有其自身的开销。一项任务是否具有良好的并行性取决于它正在做什么类型或工作。如果这是一个快速或受磁盘限制的操作,多处理会更慢。
感谢您的澄清【参考方案2】:
在 Python 中有一个稍微不常见的行为:
>>> dx = "a": 1, "b": 2
>>> [print(i) for i in dx]
a
b
基本上,这里只有key
值是迭代的一部分。然而,如果我们使用items()
,我们会看到:
>>> dx = "a": 1, "b": 2
>>> [print(i) for i in dx]
a
b
当您在池中调用map
时,它实际上是在使用第一个版本。这意味着,不是将键值对传递给stateChecker
,而是只传递键。因此,您的错误“缺少 1 个必需的位置参数”。缺少第二个值。
通过使用starmap
和items()
,我们可以解决这个问题。如上所示,items 将给出一个元组的迭代器(每个元组的一个键值对来自您的字典)。
def stateRecognizer():
with Pool(multiprocessing.cpu_count()) as mp_pool:
return mp_pool.starmap(stateChecker, checks.items())
starmap
这里指的是使用*
操作符:
>>> def f(a, b):
... print(f"a is the key for b")
...
>>> my_tuple = ("a", 1)
>>> f(*my_tuple)
a is the key for 1
>>> f(my_tuple)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: f() missing 1 required positional argument: 'b'
正如您在此处看到的,当用于将值传递给函数时,它会“解包”这些值,将元组(或列表)中的每个值放入一个参数中。您可以,当我们不使用*
运算符时,我们会收到与您最初收到的错误非常相似的错误。
还有一些注意事项:
在编写 Python 时,最好坚持使用标准命名格式。对于函数,使用蛇形大小写 (state_checker
),对于类使用骆驼形大小写。这有助于您更快地推理,以及更深奥的原因。
这个函数可能行为不端:
def stateChecker(key, value):
if (getCoord(key) is not None):
return value
假设getCoord
在一个元组中返回四个整数(原文不清楚),它的类型签名是:
def getCoord(key: Any) -> Tuple[int, int, int, int]:
....
这意味着,stateChecker
的类型签名又是:
def stateChecker(key: Any, value: Any) -> Union[None, Tuple[int, int, int, int]]:
....
在这种情况下,这是因为如果您的 if
子句计算结果为 false
,它将返回 None
。在这些情况下,getCoord
很可能会短路,但如果不知道更多,很难说是如何发生的。无论如何,您并没有真正处理 None
返回值。
【讨论】:
以上是关于在函数内多处理 Python 中的 for 循环的主要内容,如果未能解决你的问题,请参考以下文章
使用 NumPy 从 Python 中的位置向量中没有 for 循环的 One-Hot 编码?
Python中的枚举对象有什么用?怎样用内置函数enumerate()得到枚举对象?Python的for循环和C++的for循环有何区别?Python中for循环的本质是什么?