无法将动态类与 concurrent.futures.ProcessPoolExecutor 一起使用

Posted

技术标签:

【中文标题】无法将动态类与 concurrent.futures.ProcessPoolExecutor 一起使用【英文标题】:Unable to use dynamic classes with concurrent.futures.ProcessPoolExecutor 【发布时间】:2021-02-15 07:32:08 【问题描述】:

在下面的代码中,我使用generate_object 方法在_py 属性内动态创建类的对象。

如果我不使用并发方法,代码可以完美运行。但是,如果我使用来自 concurrent.futures 的并发,我不会得到想要的结果,因为错误提示(除其他外):

_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed

在谷歌搜索此错误后,我了解到只有可腌制对象才能作为参数传递给 ProcessPoolExecutor.map(),所以我决定看看如何将我的动态类变成可腌制。

问题是这个问题的所有其他解决方案都以不同的方式创建动态对象(与我在_string_to_object() 中使用的不同)。示例:1 和 2

我非常希望保持现在的动态对象创建方式,因为我的很多真实代码都是基于它的,因此我正在寻找与下面这个玩具代码一起使用的并发解决方案。

代码

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

class A:
    def __init__(self):
        self._py = r'''
class Script_0_1:
\tdef print_numbers(self):
\t\tprint('Numbers = ', 0, 'and', 1)
'''
    
    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()

print('Multiprocessing usage')
n_cores = 3
n_calls = 3

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

with ProcessPoolExecutor(max_workers=n_cores) as executor:
    args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
    results = executor.map(concurrent_function, args)

【问题讨论】:

严格来说问题在于多处理而不是concurrent.futures 本身。如果您将ProcessPoolExecutorClass 替换为concurrent.futures 中的ThreadPoolExecutor 类,则不会有问题。如果确实需要多处理,我建议您删除concurrency 标签并添加multiprocessing 标签。 如果您仔细查看错误消息,您将看到Can't pickle &lt;class '__main__.Script_0_1'。问题归根结底是 Script_0_1 类是 A 类的内部,但正在全局范围内寻找。酸洗在内部类上效果不佳。 谢谢@Booboo,你知道如何让内部类在全局范围内可见吗?不幸的是,谷歌搜索不会返回有用的结果。 在下面查看我的答案。 【参考方案1】:

我想不出一种方法来在严格遵守您当前方案的全局名称空间中创建 Script 类。然而:

既然每次调用generate_object 方法,您都会在本地命名空间中创建一个新类并实例化该类的一个对象,为什么不推迟这项工作,让它在进程池中完成呢?这还具有并行执行此类创建处理的额外优势,并且不需要酸洗。我们现在将两个整数参数 number_1number_2 传递给 concurrent_function

import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor


class A:
    def __init__(self):
        self._py = r'''
class Script_0_1:
\tdef print_numbers(self):
\t\tprint('Numbers = ', 0, 'and', 1)
'''

    def generate_text(self, name_1, name_2):
        py = self._py.format(name_1, name_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        """ Generate an object of the class inside the string self._py """

        return self._string_to_object(self.generate_text(number_1, number_2))

    def _string_to_object(self, str_class, *args, **kwargs):
        """ Transform a program written inside str_class to an object. """

        exec(str_class)
        class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
        return locals()[class_name](*args, **kwargs)

"""
from functools import partial

print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""


def concurrent_function(args):
    for arg in args:
        obj = A().generate_object(arg[0], arg[1])
        obj.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

打印:

Multiprocessing usage
Numbers =  0 and 1
Numbers =  1 and 2
Numbers =  1 and 2
Numbers =  2 and 3
Numbers =  2 and 3
Numbers =  3 and 4

更高效的方式

没有必要使用exec。而是使用闭包:

from concurrent.futures import ProcessPoolExecutor

def make_print_function(number_1, number_2):
    def print_numbers():
        print(f'Numbers = number_1 and number_2')

    return print_numbers



def concurrent_function(args):
    for arg in args:
        fn = make_print_function(arg[0], arg[1])
        fn()


def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 3

    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
        # wait for completion of all tasks:
        results = list(executor.map(concurrent_function, args))

if __name__ == '__main__':
    main()

打印:

Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4

使用对象缓存避免不必要地创建新对象

obj_cache =  # each process will have its own

def concurrent_function(args):
    for arg in args:
        # was an object created with this set of arguments: (arg[0], arg[1])?
        obj = obj_cache.get(arg)
        if obj is None: # must create new object
            obj = A().generate_object(arg[0], arg[1])
            obj_cache[arg] = obj # save object for possible future use
        obj.print_numbers()

【讨论】:

这绝对解决了我的问题,谢谢!不幸的是 exec() 是一种计算成本相当高的方法,它有点“挫败”了并行化代码的目的。在我真正的代码问题中,这个解决方案总体上加快了速度,但如果 exec() 只执行一次会更好。希望有人会提出一个解决方案来实现这一点。如果没有,我会接受这个答案(并奖励赏金)。再次感谢! 我有一个重要的更新,它使用原来的类并稍微简化了代码, 而且我添加了一个不使用exec的版本,这样效率会高很多。但是对于这个特定的示例,不清楚为什么需要这两种技术。我想知道你的实际问题是什么。 再次感谢您改进答案!不幸的是,特别是对于我的情况,使用闭包无济于事。在我的真实代码中,我正在使用代码合成,虽然为了清楚起见,这里的示例很简单(只有一个 print() 函数),但 A 类中的方法 print_numbers() 是合成的,因此我不会能够按照您的建议对其进行硬编码。这就是为什么我可能仍然需要exec() 函数的原因。但我的案例是一个非常具体的案例,您的解决方案绝对可以帮助其他人。 我在您发布的案例中看到您正在使用相同的参数构造您的 Script 类实例。如果这在现实世界中是可能的并且构造的对象是可重用的,那么缓存对象将是一种效率。我添加了代码来演示您将如何为您发布的案例执行此操作。【参考方案2】:

可能我找到了一种不需要exec() 函数的方法。实现(使用 cmets)如下。

import codecs
from concurrent.futures import ProcessPoolExecutor

class A:
    def __init__(self):
        self.py = r'''
class Script_0_1:
\tdef print_numbers(self):
\t\tprint('Numbers = ', 0, 'and', 1)
'''
    def generate_text(self, number_1, number_2):
        py = self.py.format(number_1, number_2)
        py = codecs.decode(py, 'unicode_escape')
        return py

    def generate_object(self, number_1, number_2):
        class_code = self.generate_text(number_1, number_2)
        # Create file in disk
        with open("Script_" + str(number_1) + "_" + str(number_2) + ".py", "w") as file:
            file.write(class_code)
        # Now import it and the class will now be (correctly) seen in __main__
        package = "Script_" + str(number_1) + "_" + str(number_2)
        class_name = "Script_" + str(number_1) + "_" + str(number_2)
        # This is the programmatically version of 
        # from <package> import <class_name>
        class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
        return class_name()

def concurrent_function(args):
    first_A = args[0]
    second_A = args[1]
    first_A.print_numbers()
    second_A.print_numbers()

def main():
    print('Multiprocessing usage')
    n_cores = 3
    n_calls = 2
    
    with ProcessPoolExecutor(max_workers=n_cores) as executor:
        args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
        results = executor.map(concurrent_function, args)

if __name__ == '__main__':
    main()

基本上我所做的不是动态分配类,而是将其写入文件。我这样做是因为我遇到的问题的根源是 pickle 在查看全局范围时无法正确定位嵌套类。现在我正在以编程方式导入该类(将其保存到文件之后)。

当然,这种解决方案也有处理文件的瓶颈,成本也很高。我没有衡量处理文件或exec 是否更快,但在我的实际情况下,我只需要合成类的一个对象(而不是像提供的玩具代码中那样每个并行调用一个对象),因此文件选项最适合我。

还有一个问题:使用n_calls = 15(例如)并多次执行后,有时似乎无法导入模块(刚刚创建的文件)。我试图在以编程方式导入之前放置一个sleep(),但它没有帮助。使用少量调用时似乎不会发生此问题,并且似乎也是随机发生的。部分错误堆栈的示例如下所示:

Traceback (most recent call last):
  File "main.py", line 45, in <module>
    main()
  File "main.py", line 42, in main
    results = executor.map(concurrent_function, args)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 674, in map
    results = super().map(partial(_process_chunk, fn),
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 184, in _get_chunks
    chunk = tuple(itertools.islice(it, chunksize))
  File "main.py", line 41, in <genexpr>
    args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
  File "main.py", line 26, in generate_object
    class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
ModuleNotFoundError: No module named 'Script_13_14'

【讨论】:

以上是关于无法将动态类与 concurrent.futures.ProcessPoolExecutor 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

并发实现-Callable/Future 实现返回值控制的线程

95-24-030-Future-ChannelFuture

JUC线程池扩展可回调的Future

raise ValueError(err) - 在 Python 中使用 concurrent.future 实现多线程

Java并发编程-扩展可回调的Future

播放框架错误:类型不匹配 - 发现 scala.concurrent.Future[play.api.mvc.Result] required: play.api.mvc.Result