multiprocessing.Pool.map引发MemoryError
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing.Pool.map引发MemoryError相关的知识,希望对你有一定的参考价值。
我正在将强化学习框架从串行代码执行重写为并行(多处理),以减少培训时间。它可以工作,但是经过几个小时的训练后,会抛出MemoryError
。我尝试在每个循环后都添加gc.collect
,但没有更改。
这里是for循环,利用了多重处理:
for episode in episodes:
env.episode = episode
flex_list = [0,1,2]
for machine in env.list_of_machines:
flex_plan = []
for time_step in range(0,env.steplength):
flex_plan.append(random.choice(flex_list))
machine.flex_plan = flex_plan
env.current_step = 0
steps = []
state = env.reset(restricted=True)
steps.append(state)
# multiprocessing part, has condition to use a specific amount of CPUs or 'all' of them
####################################################
func_part = partial(parallel_pool, episode=episode, episodes=episodes, env=env, agent=agent, state=state, log_data_qvalues=log_data_qvalues, log_data=log_data, steps=steps)
if CPUs_used == 'all':
mp.Pool().map(func_part, range(env.steplength-1))
else:
mp.Pool(CPUs_used).map(func_part, range(env.steplength-1))
############################################################
# model is saved periodically, not only in the end
save_interval = 100 #set episode interval to save models
if (episode + 1) % save_interval == 0:
agent.save_model(f'models/model_filename_episode + 1')
print(f'model saved at episode episode + 1')
plt.close()
gc.collect()
经过26集训练后的输出:
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 231/234 THD fake slack: 0.09487 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.181
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 232/234 THD fake slack: 0.09488 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.181
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 233/234 THD fake slack: 0.09489 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.179
Traceback (most recent call last):
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 87, in <module>
main()
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 77, in main
duration = cf.training(episodes, env, agent, filename, topology=topology, multi_processing=multi_processing, CPUs_used=CPUs_used)
File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 166, in training
save_interval = parallel_training(range(episodes), env, agent, log_data_qvalues, log_data, filename, CPUs_used)
File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 81, in parallel_training
mp.Pool().map(func_part, range(env.steplength-1))
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 657, in get
raise self._value
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 431, in _handle_tasks
put(task)
File "C:\Users\Artur\Anaconda\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\Artur\Anaconda\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
MemoryError
是否有解决此问题的方法?
当您在循环中创建进程时,我相信您的内存会泄漏,因为您创建的进程在完成运行后会被挂起。
警告:multiprocessing.pool对象具有内部资源,需要通过使用池作为上下文管理器或通过调用close()和Terminate()手动。否则,可能会导致流程暂停定案。注意依靠垃圾回收器是不正确的销毁池,因为CPython无法确保该池将被调用(有关更多信息,请参见object。
我建议您尝试稍微重构代码:>)。
# set the CPUs_used to a desired number or None to use all available CPUs
with mp.Pool(processes=CPUs_used) as p:
p.map(func_part, range(env.steplength-1))
或者您可以手动.close()
和.join()
,最适合您的编码样式。
以上是关于multiprocessing.Pool.map引发MemoryError的主要内容,如果未能解决你的问题,请参考以下文章
multiprocessing.Pool.map_async() 的结果是不是以与输入相同的顺序返回?
python multiprocessing pool.map() 等到方法完成
如何从multiprocessing.Pool.map的worker_funtion内部为数组赋值?
multiprocessing.Pool.map引发MemoryError