使用 ProcessPoolExecutor 时更新变量

Posted

技术标签:

【中文标题】使用 ProcessPoolExecutor 时更新变量【英文标题】:Update variable while working with ProcessPoolExecutor 【发布时间】:2020-03-21 05:53:14 【问题描述】:
if __name__ == '__main__':

    MATCH_ID = str(doc_ref2.id)

    MATCH_ID_TEAM = doc_ref3.id

    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
        results = list(executor.map(ESPNlayerFree, teamList1))
    
    MATCH_ID_TEAM = str(doc_ref4.id)

    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
        results = list(executor.map(ESPNlayerFree, teamList2))

当我打印MATCH_ID_TEAM 时,它会打印值。但在这个过程中,它显示了我在顶部设置为空的默认值。

如何将变量的值更新到所有进程?

ESPNPlayerFree is a class that takes `id` as an argument. So `teamList1` and `teamList2` are list of ids to initialize my objects.

MATCH_IDMATCH_ID_TEAM 是在我的类中使用的变量 ESPNPlayerFree

操作系统 Windows 10 64 位

IDE Pycharm

Python 版本 3.6.1

【问题讨论】:

什么变量?什么是teamList1?什么是 ESPNPlayerFree?您可以添加有关您的流程的更完整的代码吗? @ranifisch 更新解释了一切。我无法添加更多代码,因为已经没有更多代码了,只有 ESPNPlayerFree 这个巨大的类 所以你想将 MATCH_ID 和 MATCH_ID_TEAM 传递给进程并在进程中获取更新的值?我的意思是从您的“主要”更新它们并在流程中更新值? 是的,这正是我想要的工作 将列表 teamLIst1 转换为列表 [(MATCH_ID_TEAM, item1), (MATCH_ID_TEAM, item2), etc.] 并将此列表用于进程。每个进程都必须将参数解压缩到变量 - match_id_team, item = arg 【参考方案1】:

我想知道几天前@furas 在他的评论中留下的地方。最简单的方法确实是通过.map() 来传递你在课堂上需要的所有东西。 executor.map() 期待迭代,它被压缩到一个参数元组中,以便在您的工作人员中进行每个函数调用。

您显然需要MATCH_IDMATCH_ID_TEAM 才能在整个工作中保持不变,即一次 调用executor.map()。您面临的挑战是,这两个都是可迭代对象(字符串),但您需要将它们作为一个整体进行复制,并且通常足以与您的团队列表可迭代项中的每个项目相匹配。

因此,当您将这些字符串与团队 ID 列表一起传递给 .map() 时,您只需用 itertools.repeat() 包装这些字符串。 itertools.repeat() 默认返回传递对象的无限迭代器。 ProcessPoolExecutor 在内部使用 zip() 将所有可迭代项中的项目组合为参数。

import concurrent.futures
import multiprocessing
from itertools import repeat


class ESPNPlayerFree:
    def __init__(self, team_id, match_id, match_id_team):
        self.teams_id = team_id
        self.match_id = match_id
        self.match_id_team = match_id_team
        print(
            multiprocessing.current_process().name,
            self.teams_id, self.match_id, self.match_id_team
        )


if __name__ == '__main__':

    teams1 = [f"idi" for i in range (10)]
    teams2 = [f"idi" for i in range(10, 20)]

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

        MATCH_ID = 'doc_ref2.id'
        MATCH_ID_TEAM = 'doc_ref3.id'

        results = list(
            executor.map(
                ESPNPlayerFree,
                teams1,
                repeat(MATCH_ID),
                repeat(MATCH_ID_TEAM),
            )
        )

        print("--- new MATCH_ID_TEAM ---")
        MATCH_ID_TEAM = 'doc_ref4.id'

        results = list(
            executor.map(
                ESPNPlayerFree,
                teams2,
                repeat(MATCH_ID),
                repeat(MATCH_ID_TEAM),
            )
        )

输出:

ForkProcess-1 id0 doc_ref2.id doc_ref3.id
ForkProcess-2 id1 doc_ref2.id doc_ref3.id
ForkProcess-3 id2 doc_ref2.id doc_ref3.id
ForkProcess-4 id3 doc_ref2.id doc_ref3.id
ForkProcess-1 id4 doc_ref2.id doc_ref3.id
ForkProcess-3 id5 doc_ref2.id doc_ref3.id
ForkProcess-2 id6 doc_ref2.id doc_ref3.id
ForkProcess-4 id7 doc_ref2.id doc_ref3.id
ForkProcess-3 id8 doc_ref2.id doc_ref3.id
ForkProcess-1 id9 doc_ref2.id doc_ref3.id
--- new MATCH_ID_TEAM ---
ForkProcess-1 id10 doc_ref2.id doc_ref4.id
ForkProcess-3 id11 doc_ref2.id doc_ref4.id
ForkProcess-2 id12 doc_ref2.id doc_ref4.id
ForkProcess-4 id13 doc_ref2.id doc_ref4.id
ForkProcess-1 id14 doc_ref2.id doc_ref4.id
ForkProcess-3 id15 doc_ref2.id doc_ref4.id
ForkProcess-2 id16 doc_ref2.id doc_ref4.id
ForkProcess-4 id17 doc_ref2.id doc_ref4.id
ForkProcess-2 id18 doc_ref2.id doc_ref4.id
ForkProcess-1 id19 doc_ref2.id doc_ref4.id

Process finished with exit code 0

对于第二个工作,有了新的MATCH_ID_TEAM,您就不必再次重新创建ProcessPoolExecutor,只要您需要,您只需在上下文管理器中再次使用现有的。

【讨论】:

完全遵循但我收到此错误BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. 为了清楚起见,我在@furas 解决方案之后也收到此错误。第一部分存储数据,然后抛出错误,永远不会到达第二部分 @johnrao07 您的操作系统正在杀死一个工作人员,在这种情况下,整个池将关闭。可能是您使用的内存过多,请尝试使用较少的工作人员并注意内存消耗。 尝试使用max_workers=4,同样的错误。我还尝试了一个只有 2 个项目的手动列表,同样的错误。它完成处理然后抛出错误 @johnrao07 你是否已经收到我未更改的示例代码的错误? @johnrao07 那不应该。您能否指定您的操作系统、运行代码的方式(哪个 IDE 或终端)、您使用的确切 Python 版本以及您是在服务器还是本地机器上运行代码。

以上是关于使用 ProcessPoolExecutor 时更新变量的主要内容,如果未能解决你的问题,请参考以下文章

在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败

ProcessPoolExecutor 和 Ctrl C

Python 中的 ProcessPoolExecutor 和 Lock

无法将动态类与 concurrent.futures.ProcessPoolExecutor 一起使用

使用 ProcessPoolExecutor 进行并行处理

在 Python 中使用 ProcessPoolExecutor 的运行调用数不正确