尝试python多处理的Windows上的RuntimeError

Posted

技术标签:

【中文标题】尝试python多处理的Windows上的RuntimeError【英文标题】:RuntimeError on windows trying python multiprocessing 【发布时间】:2013-08-14 19:33:16 【问题描述】:

我正在尝试在 Windows 机器上使用线程和多处理的第一个正式 Python 程序。我无法启动这些进程,python 给出了以下消息。问题是,我没有在 ma​​in 模块中启动我的线程。线程在类内的单独模块中处理。

编辑:顺便说一下,这段代码在 ubuntu 上运行良好。不完全在 Windows 上

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

我的原始代码很长,但我能够在代码的删减版本中重现该错误。它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块外,它几乎没有做任何事情。第二个模块是代码的核心所在。


testMain.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

parallelTestModule.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

【问题讨论】:

@doctorlove 我将它作为 python testMain.py 运行 当然 - 你需要一个 if name == 'main' 查看答案和文档 @NGAlgo 在我调试 pymongo 和多处理问题时,您的脚本对我很有帮助。谢谢! 【参考方案1】:

在 Windows 上,子进程将在启动时导入(即执行)主模块。您需要在主模块中插入if __name__ == '__main__': 保护以避免递归创建子进程。

修改testMain.py

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

【讨论】:

(用手掌拍打额头)呸!有用!!!!太感谢了!我错过了一个事实,即它是重新导入的原始主模块!一直以来,我一直在尝试“name ==”检查,就在我启动进程之前。 我似乎无法导入“parallelTestModule”。我正在使用 Python 2.7。它应该开箱即用吗? @Jonny parallelTestModule.py 的代码是问题的一部分。 @DeshDeepSingh 代码 sn -p 不是一个独立的例子;这是对OP代码的修改 @DeshDeepSingh 该模块是问题的一部分。【参考方案2】:

尝试将您的代码放在 testMain.py 中的 main 函数中

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

见docs:

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

"确保主模块可以被新的 Python 安全导入 解释器而不会导致意外的副作用(例如 新进程)。”

...通过使用if __name__ == '__main__'

【讨论】:

【参考方案3】:

虽然前面的答案是正确的,但有一个小问题需要说明一下。

如果您的主模块导入另一个模块,其中定义了全局变量或类成员变量并将其初始化为(或使用)一些新对象,您可能必须以相同的方式对该导入设置条件:

if __name__ ==  '__main__':
  import my_module

【讨论】:

【参考方案4】:

正如@Ofer 所说,当您使用其他库或模块时,您应该将它们全部导入if __name__ == '__main__':

所以,就我而言,这样结束:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()

【讨论】:

【参考方案5】:

你好,这是我的多进程结构

from multiprocessing import Process
import time


start = time.perf_counter()


def do_something(time_for_sleep):
    print(f'Sleeping time_for_sleep second...')
    time.sleep(time_for_sleep)
    print('Done Sleeping...')



p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])


if __name__ == '__main__':
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    finish = time.perf_counter()
    print(f'Finished in round(finish-start,2 ) second(s)')

您不必将导入放在if __name__ == '__main__': 中,只需运行您希望在其中运行的程序

【讨论】:

【参考方案6】:

就我而言,这是代码中的一个简单错误,在创建变量之前使用了变量。在尝试上述解决方案之前值得检查一下。天知道为什么我会收到这个特定的错误消息。

【讨论】:

【参考方案7】:

以下解决方案应该适用于 python 多处理和 pytorch 多处理。

正如其他答案提到的那样,解决方法是使用if __name__ == '__main__':,但我在确定从哪里开始时遇到了几个问题,因为我使用了多个脚本和模块。当我可以在 main 中调用我的第一个函数时,它开始创建多个进程之前的所有内容(不知道为什么)。

把它放在第一行(甚至在导入之前)是有效的。仅调用第一个函数返回超时错误。下面是我的代码的第一个文件,在调用多个函数后使用了多处理,但将 main 放在第一个似乎是这里唯一的解决方法。

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

    # Get demonstration data if necessary and behavior clone
    if job_data['algorithm'] != 'NPG':
        print("========================================")
        print("Collecting expert demonstrations")
        print("========================================")
        demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))

        ########################################################################################
        demo_paths = demo_paths[0:3]
        print (job_data['demo_file'], len(demo_paths))
        for d in range(len(demo_paths)):
            feats = demo_paths[d]['features']
            feats = np.vstack(feats)
            demo_paths[d]['observations'] = feats

        ########################################################################################

        bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
                      lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)

        in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
        bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
        bc_agent.set_variance_with_data(out_scale)

        ts = timer.time()
        print("========================================")
        print("Running BC with expert demonstrations")
        print("========================================")
        bc_agent.train()
        print("========================================")
        print("BC training complete !!!")
        print("time taken = %f" % (timer.time() - ts))
        print("========================================")

        # if job_data['eval_rollouts'] >= 1:
        #     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
        #     print("Score with behavior cloning = %f" % score[0][0])

    if job_data['algorithm'] != 'DAPG':
        # We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
        demo_paths = None

    # ===============================================================================
    # RL Loop
    # ===============================================================================

    rl_agent = DAPG(e, policy, baseline, demo_paths,
                    normalized_step_size=job_data['rl_step_size'],
                    lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
                    seed=job_data['seed'], save_logs=True
                    )

    print("========================================")
    print("Starting reinforcement learning phase")
    print("========================================")


    ts = timer.time()
    train_agent(job_name=JOB_DIR,
                agent=rl_agent,
                seed=job_data['seed'],
                niter=job_data['rl_num_iter'],
                gamma=job_data['rl_gamma'],
                gae_lambda=job_data['rl_gae'],
                num_cpu=job_data['num_cpu'],
                sample_mode='trajectories',
                num_traj=job_data['rl_num_traj'],
                num_samples= job_data['rl_num_samples'],
                save_freq=job_data['save_freq'],
                evaluation_rollouts=job_data['eval_rollouts'])
    print("time taken = %f" % (timer.time()-ts))

【讨论】:

以上是关于尝试python多处理的Windows上的RuntimeError的主要内容,如果未能解决你的问题,请参考以下文章

Windows 上的 Python 多处理运行时错误

MacOS 和 Windows 上的 Python 多处理行为

Windows 上的 Python DEAP 和多处理:AttributeError

Windows 上的 python 多处理,如果 __name__ == "__main__"

Windows 上的 Python 2.7,“assert main_name not in sys.modules, main_name”适用于所有多处理示例

Windows 上的 Python 并行处理是不是存在任何已知问题? [关闭]