发球台发电机上的多处理

Posted

技术标签:

【中文标题】发球台发电机上的多处理【英文标题】:multiprocessing on tee'd generators 【发布时间】:2017-01-26 12:32:30 【问题描述】:

考虑以下脚本,我在其中测试了两种对itertools.tee 获得的生成器执行某些计算的方法:

#!/usr/bin/env python3

from sys import argv
from itertools import tee
from multiprocessing import Process

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def compute_double_sum(iterable):
    s = sum(map(double, iterable))
    print(s)

def square(x):
    return x * x

def compute_square_sum(iterable):
    s = sum(map(square, iterable))
    print(s)

g1, g2 = tee(my_generator(), 2)

try:
    processing_type = argv[1]
except IndexError:
    processing_type = "no_multi"

if processing_type == "multi":
    p1 = Process(target=compute_double_sum, args=(g1,))
    p2 = Process(target=compute_square_sum, args=(g2,))
    print("p1 starts")
    p1.start()
    print("p2 starts")
    p2.start()
    p1.join()
    print("p1 finished")
    p2.join()
    print("p2 finished")
else:
    compute_double_sum(g1)
    compute_square_sum(g2)

这是我在“正常”模式下运行脚本时获得的结果:

$ ./test_tee.py 
0
1
2
3
4
20
30

这里是并行模式:

$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished

最初的生成器显然被“复制”了两次。

我想避免这种情况,因为在我的实际应用程序中,这似乎会在我用来制作初始生成器 (https://github.com/pysam-developers/pysam/issues/397) 的外部库之一中引发错误,并且仍然能够在在相同的生成值上并行。

有没有办法实现我想要的?

【问题讨论】:

这是预期的行为,因为当您使用multiprocessing 时,整个进程都是分叉的,因此每个进程都有自己的迭代器副本。至少,这就是在 Linux / Unix 上发生的事情。在 Windows 上,它会崩溃,因为它不使用 fork for multiprocessing。相反,您的代码会因TypeError: can't pickle generator objects 而崩溃。 为什么需要迭代器?你能计算出列表中的结果吗?您应该以 share nothing 架构为目标——多处理允许共享代码确实是相当不幸的。你能用Pool.imap吗? @AnttiHaapala 在我的实际应用案例中,我不希望计算整个列表,因为我的生成器可能会产生数千万或数亿个不可忽略大小的项目。我怕内存饱和。使用 Pool 确实似乎是一种潜在的解决方法。 【参考方案1】:

我在这里找到了一些替代方法:https://***.com/a/26873783/1878788。

在这种方法中,我们不再使用生成器。我们只是复制其生成的项目并将它们提供给一个复合函数,该函数仅在一个进程中对生成的项目进行并行处理,但我们通过使用Pool 来利用多处理(这就是所谓的 map/reduce 方法吗? ?):

#!/usr/bin/env python3

from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def square(x):
    return x * x

def double_and_square(args_list):
    return (double(*args_list[0]), square(*args_list[1]))

def sum_tuples(tup1, tup2):
    return tuple(starmap(add, zip(tup1, tup2)))

with Pool(processes=5) as pool:
    results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))

    print(reduce(sum_tuples, results_generator))

这适用于玩具示例。我现在必须弄清楚如何在实际应用案例中类似地组织我的计算。

我尝试使用高阶函数 (make_funcs_applier) 将其泛化以生成复合函数 (apply_funcs),但出现以下错误:

AttributeError: Can't pickle local object  'make_funcs_applier.<locals>.apply_funcs'

更普遍的尝试

根据 cmets 中的建议,我尝试改进上述解决方案以使其更可重用:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
#     def tuple_func(args_list):
#         return tuple(func(args) for func, args in zip(funcs, args_list))
#     return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
    """This kind of object can be used to group functions and call them on a
    tuple of arguments."""
    __slots__ = ("funcs", )

    def __init__(self, funcs):
        self.funcs = funcs

    def __len__(self):
        return len(self.funcs)

    def __call__(self, args_list):
        return tuple(func(args) for func, args in zip(self.funcs, args_list))

    def fork_args(self, args_list):
        """Takes an arguments list and repeat them in a n-tuple."""
        return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
    """Element-wise sum of tuple items."""
    return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
    return x + 1

def double(x):
    return 2 * x

def square(x):
    return x * x

def main():
    def my_generator():
        for i in range(5):
            print(i)
            yield i


    test_tuple_func = FuncApplier((plus_one, double, square))

    with Pool(processes=5) as pool:
        results_generator = pool.imap_unordered(
            test_tuple_func,
            (test_tuple_func.fork_args(args_list) for args_list in my_generator()))
        print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
            sum_tuples, results_generator))
    exit(0)

if __name__ == "__main__":
    exit(main())

测试它:

$ ./test_fork.py 
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30

我仍然有一些烦人的限制,因为我倾向于经常在我的代码中定义本地函数。

【讨论】:

本地函数确实不可picklable;例如,您可以通过使用带有__call__ 的***类的实例来解决此问题。 感谢您的建议。我相应地更新了答案。 我试图将该技术应用于我的实际应用案例,除非我在 cProfile 下运行它,否则它似乎可以工作,在这种情况下,它无法腌制在我的顶层定义的相当正常的函数脚本:_pickle.PicklingError: Can't pickle &lt;function count_annot at 0x7f4c2ae91048&gt;: attribute lookup count_annot on __main__ failed.【参考方案2】:

multiprocessing 系统将您的主模块导入到它启动的每个进程中。因此模块代码在每个进程中执行。

您可以通过使用始终推荐的方法来避免这种情况

if __name__ == '__main__':

在你的类和函数定义之后,所以主程序的代码只在启动过程中运行。这应该是仅适用于 Windows 平台的要求,但可能值得一试,因为您抱怨代码运行两次。

【讨论】:

我刚试过,但这并不能阻止生成器明显重复:我仍然看到它的输出打印了两次。 哦,好吧……对不起。很高兴您似乎找到了解决此问题的方法。

以上是关于发球台发电机上的多处理的主要内容,如果未能解决你的问题,请参考以下文章

英国Fiddler's Ferry 2*500MW*20%生物质耦合发电案例

人体发电?已经触手可及!

怎样测试挖掘机的发电机是不是发电?

(MATLAB代码分享可运行)基于NSGA-2算法的多目标水电站电力调度优化

数采仪下生活垃圾焚烧发电厂烟气排放过程(工况)自动监控

光伏发电上网电量无线传输/4G能源数据集中器/能源数据采集终端