使用 ProcessPoolExecutor 时更新变量
Posted
技术标签:
【中文标题】使用 ProcessPoolExecutor 时更新变量【英文标题】:Update variable while working with ProcessPoolExecutor 【发布时间】:2020-03-21 05:53:14 【问题描述】:if __name__ == '__main__':
MATCH_ID = str(doc_ref2.id)
MATCH_ID_TEAM = doc_ref3.id
with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
results = list(executor.map(ESPNlayerFree, teamList1))
MATCH_ID_TEAM = str(doc_ref4.id)
with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
results = list(executor.map(ESPNlayerFree, teamList2))
当我打印MATCH_ID_TEAM
时,它会打印值。但在这个过程中,它显示了我在顶部设置为空的默认值。
如何将变量的值更新到所有进程?
ESPNPlayerFree is a class that takes `id` as an argument. So `teamList1` and `teamList2` are list of ids to initialize my objects.
MATCH_ID
和 MATCH_ID_TEAM
是在我的类中使用的变量 ESPNPlayerFree
操作系统 Windows 10 64 位
IDE Pycharm
Python 版本 3.6.1
【问题讨论】:
什么变量?什么是teamList1?什么是 ESPNPlayerFree?您可以添加有关您的流程的更完整的代码吗? @ranifisch 更新解释了一切。我无法添加更多代码,因为已经没有更多代码了,只有ESPNPlayerFree
这个巨大的类
所以你想将 MATCH_ID 和 MATCH_ID_TEAM 传递给进程并在进程中获取更新的值?我的意思是从您的“主要”更新它们并在流程中更新值?
是的,这正是我想要的工作
将列表 teamLIst1
转换为列表 [(MATCH_ID_TEAM, item1), (MATCH_ID_TEAM, item2), etc.]
并将此列表用于进程。每个进程都必须将参数解压缩到变量 - match_id_team, item = arg
【参考方案1】:
我想知道几天前@furas 在他的评论中留下的地方。最简单的方法确实是通过.map()
来传递你在课堂上需要的所有东西。 executor.map()
期待迭代,它被压缩到一个参数元组中,以便在您的工作人员中进行每个函数调用。
您显然需要MATCH_ID
和MATCH_ID_TEAM
才能在整个工作中保持不变,即一次 调用executor.map()
。您面临的挑战是,这两个都是可迭代对象(字符串),但您需要将它们作为一个整体进行复制,并且通常足以与您的团队列表可迭代项中的每个项目相匹配。
因此,当您将这些字符串与团队 ID 列表一起传递给 .map()
时,您只需用 itertools.repeat()
包装这些字符串。 itertools.repeat()
默认返回传递对象的无限迭代器。 ProcessPoolExecutor
在内部使用 zip()
将所有可迭代项中的项目组合为参数。
import concurrent.futures
import multiprocessing
from itertools import repeat
class ESPNPlayerFree:
def __init__(self, team_id, match_id, match_id_team):
self.teams_id = team_id
self.match_id = match_id
self.match_id_team = match_id_team
print(
multiprocessing.current_process().name,
self.teams_id, self.match_id, self.match_id_team
)
if __name__ == '__main__':
teams1 = [f"idi" for i in range (10)]
teams2 = [f"idi" for i in range(10, 20)]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
MATCH_ID = 'doc_ref2.id'
MATCH_ID_TEAM = 'doc_ref3.id'
results = list(
executor.map(
ESPNPlayerFree,
teams1,
repeat(MATCH_ID),
repeat(MATCH_ID_TEAM),
)
)
print("--- new MATCH_ID_TEAM ---")
MATCH_ID_TEAM = 'doc_ref4.id'
results = list(
executor.map(
ESPNPlayerFree,
teams2,
repeat(MATCH_ID),
repeat(MATCH_ID_TEAM),
)
)
输出:
ForkProcess-1 id0 doc_ref2.id doc_ref3.id
ForkProcess-2 id1 doc_ref2.id doc_ref3.id
ForkProcess-3 id2 doc_ref2.id doc_ref3.id
ForkProcess-4 id3 doc_ref2.id doc_ref3.id
ForkProcess-1 id4 doc_ref2.id doc_ref3.id
ForkProcess-3 id5 doc_ref2.id doc_ref3.id
ForkProcess-2 id6 doc_ref2.id doc_ref3.id
ForkProcess-4 id7 doc_ref2.id doc_ref3.id
ForkProcess-3 id8 doc_ref2.id doc_ref3.id
ForkProcess-1 id9 doc_ref2.id doc_ref3.id
--- new MATCH_ID_TEAM ---
ForkProcess-1 id10 doc_ref2.id doc_ref4.id
ForkProcess-3 id11 doc_ref2.id doc_ref4.id
ForkProcess-2 id12 doc_ref2.id doc_ref4.id
ForkProcess-4 id13 doc_ref2.id doc_ref4.id
ForkProcess-1 id14 doc_ref2.id doc_ref4.id
ForkProcess-3 id15 doc_ref2.id doc_ref4.id
ForkProcess-2 id16 doc_ref2.id doc_ref4.id
ForkProcess-4 id17 doc_ref2.id doc_ref4.id
ForkProcess-2 id18 doc_ref2.id doc_ref4.id
ForkProcess-1 id19 doc_ref2.id doc_ref4.id
Process finished with exit code 0
对于第二个工作,有了新的MATCH_ID_TEAM
,您就不必再次重新创建ProcessPoolExecutor
,只要您需要,您只需在上下文管理器中再次使用现有的。
【讨论】:
完全遵循但我收到此错误BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
为了清楚起见,我在@furas 解决方案之后也收到此错误。第一部分存储数据,然后抛出错误,永远不会到达第二部分
@johnrao07 您的操作系统正在杀死一个工作人员,在这种情况下,整个池将关闭。可能是您使用的内存过多,请尝试使用较少的工作人员并注意内存消耗。
尝试使用max_workers=4
,同样的错误。我还尝试了一个只有 2 个项目的手动列表,同样的错误。它完成处理然后抛出错误
@johnrao07 你是否已经收到我未更改的示例代码的错误?
@johnrao07 那不应该。您能否指定您的操作系统、运行代码的方式(哪个 IDE 或终端)、您使用的确切 Python 版本以及您是在服务器还是本地机器上运行代码。以上是关于使用 ProcessPoolExecutor 时更新变量的主要内容,如果未能解决你的问题,请参考以下文章
在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败
Python 中的 ProcessPoolExecutor 和 Lock