如何将 dict 参数传递给 Python Pool.apply_async 方法?

Posted

技术标签:

【中文标题】如何将 dict 参数传递给 Python Pool.apply_async 方法?【英文标题】:How to pass a dict argument to Python Pool.apply_async method? 【发布时间】:2021-12-29 04:15:58 【问题描述】:

我需要将 dict 传递给 Python Pool.apply_async。它似乎没有按预期工作,因为我的脚本卡住了。

import multiprocessing as mp

def my_func(session, index):
    result =  "server": session['server'], "exit_code": session['exit_code'],"index": index   
    return result
        
def my_callback(result):
    print(result)

pool = mp.Pool(5)

sessions = []
sessions.append("server": "foo.tld", "exit_code": 1)
sessions.append("server": "bar.tld", "exit_code": 0)

for i, session in enumerate(sessions):
    # Below, "session" argument is a dict
    pool.apply_async(my_func, kwds= "session": session, "index": i, callback=my_callback)
        
pool.close()
pool.join() 

【问题讨论】:

你所说的“卡住”是什么意思。你在哪个平台上运行? 【参考方案1】:

您需要保留从apply_async 返回的AsyncResult 对象,并调用其get 方法,如下所示:

for i, session in enumerate(sessions):
    res = pool.apply_async(my_func,  kwds= "session": session, "index": i, callback=my_callback)
    print(res.get())

输出应该是这样的:

'server': 'foo.tld', 'exit_code': 1, 'index': 0
'server': 'foo.tld', 'exit_code': 1, 'index': 0
'server': 'bar.tld', 'exit_code': 0, 'index': 1
'server': 'bar.tld', 'exit_code': 0, 'index': 1

查看文档中的example

【讨论】:

谢谢,但这并不能解决问题。问题在于“会话”参数,这是一个未正确处理的字典。如果我只将“index”arg(整数)传递给“test_func”,脚本就不再卡住了。也许我可能会尝试序列化/反序列化“会话”参数,但首先想避免这种情况。【参考方案2】:

这个简化版(withimap_unordered)对我来说很好用。

永远记得将主要的多处理入口点包装在一个函数中。

import multiprocessing as mp


def my_func(arg):
    index, session = arg
    result = 
        "server": session["server"],
        "exit_code": session["exit_code"],
        "index": index,
    
    return result


def main():
    sessions = [
        "server": "foo.tld", "exit_code": 1,
        "server": "bar.tld", "exit_code": 0,
    ]
    with mp.Pool(5) as pool:

        for res in pool.imap_unordered(my_func, enumerate(sessions)):
            print(res)


if __name__ == "__main__":
    main()

【讨论】:

谢谢。您能否详细说明为什么使用“imap_unordered”而不是 apply_async 以及为什么将入口点包装在一个函数(又名“main”)中?另请注意,我尝试通过序列化/反序列化传递给 apply_async 的“会话”参数来修改我的代码并且它有效。 因为imap_unordered 为您完成了入队工作和批量接收结果的工作。至于为什么main,因为the manual tells you to。【参考方案3】:

您所说的只是“我的脚本卡住了”,这不是很具描述性。您也没有使用您正在运行的平台标记您的问题,例如linuxwindows,正如您在发布带有multiprocessing 标记的问题时应该做的那样。这让我猜测您的问题来自在 Windows 上运行。如果是这样,那么问题在于 Windows 使用一种名为 spawn 的方法来创建新进程。这意味着在多处理池中创建进程并调用您的工作函数my_func,创建一个新的空地址空间并启动一个新的 Python 解释器,该解释器通过重新读取您的源程序在全局执行每个语句来初始化该进程范围。

所以全局范围内的所有导入语句、函数定义、数据声明、可执行语句等都会被执行。新创建的进程的唯一区别是,虽然在主进程内部变量__name__ 的值是'__main__',但对于这些子进程它不会有这个值。这允许您将不希望通过子进程初始化执行的任何语句放置在测试__name__ 值的块中。这样的语句将是创建子流程的实际可执行语句。如果您不将这些语句放在 if __name__ == '__main__': 块中,那么您将进入一个递归循环,创建新进程无限(实际上 Python 会识别这种情况并引发异常)。

因此,通常您将创建新进程的代码放置在诸如 main 之类的函数中(选择您想要的任何名称)并确保仅根据 __name__ 的值有条件地调用 main

if __name__ == '__main__':
    main()

或者您可以将进程创建代码保留在全局范围内,但在 if __name__ == '__main__': 块内:

import multiprocessing as mp

def my_func(session, index):
    result =  "server": session['server'], "exit_code": session['exit_code'],"index": index 
    return result

def my_callback(result):
    print(result)

if __name__ == '__main__':
    pool = mp.Pool(5)

    sessions = []
    sessions.append("server": "foo.tld", "exit_code": 1)
    sessions.append("server": "bar.tld", "exit_code": 0)

    for i, session in enumerate(sessions):
        # Below, "session" argument is a dict
        pool.apply_async(my_func, kwds= "session": session, "index": i, callback=my_callback)

    pool.close()
    pool.join()

打印:

'server': 'foo.tld', 'exit_code': 1, 'index': 0
'server': 'bar.tld', 'exit_code': 0, 'index': 1

注意我还放了all可执行语句,比如sessions列表的创建,为了效率,if __name__ == '__main__':内的子进程不需要执行。 p>

不过,如下代码更“简洁”:

import multiprocessing as mp

def my_func(session, index):
    result =  "server": session['server'], "exit_code": session['exit_code'],"index": index 
    return result

def my_callback(result):
    print(result)

def main():
    pool = mp.Pool(5)

    sessions = []
    sessions.append("server": "foo.tld", "exit_code": 1)
    sessions.append("server": "bar.tld", "exit_code": 0)

    for i, session in enumerate(sessions):
        # Below, "session" argument is a dict
        pool.apply_async(my_func, kwds= "session": session, "index": i, callback=my_callback)

    pool.close()
    pool.join()
    
if __name__ == '__main__':
    main()

【讨论】:

以上是关于如何将 dict 参数传递给 Python Pool.apply_async 方法?的主要内容,如果未能解决你的问题,请参考以下文章

将字典作为关键字参数传递给函数

通过 add_job_flow_steps 将嵌套字典传递给 EMR

Python:如何将参数传递给线程作业(回调)

Python:如何将多个参数传递给属性 getter?

如何将附加参数传递给自定义 python 排序函数

在 Eclipse(Pydev) 中将命令行参数传递给 Python 脚本