将带有对象的函数传递给 concurrent.futures.ProcessPoolExecutor()?
Posted
技术标签:
【中文标题】将带有对象的函数传递给 concurrent.futures.ProcessPoolExecutor()?【英文标题】:Pass function with objects into concurrent.futures.ProcessPoolExecutor()? 【发布时间】:2020-11-08 11:42:45 【问题描述】:需要帮助将对象传递给cpu_bound
函数。该程序同时使用 asyncio 和 multiprocessing,所以如果您两者都知道,那将是最好的帮助!
基本上问题出在:result = loop.run_in_executor(pool, lambda: cpu_bound(list1, list2, int_var)
我无法将 lambda 函数传递到池中,并且程序错误:_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000230FDEDD700>: attribute lookup <lambda> on __main__ failed
这是我的程序的模拟结构,因为整个程序有 2000 多行代码:
import ...
# Defining some functions...
.
def cpu_bound(list1, list2, int_var):
# Some CPU-bound calculations...
.
async def find_trades(session, list3, list4):
# Some async function calls
.
with concurrent.futures.ProcessPoolExecutor() as pool:
result = loop.run_in_executor(
pool, dill.loads(dill.dumps(lambda: cpu_bound(list1, list2, int_var)))
try:
await asyncio.wait_for(
result, timeout=5
)
except asyncio.TimeoutError:
print("Took to long to compute!")
async def run():
# Some async function calls
.
await asyncio.gather(find_trades(session, list3, list4), ...)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
不幸的是,我对多处理相对较新,可能不知道很多关于将对象从主程序循环传递到它的多处理部分所带来的限制。
非常感谢所有帮助!
【问题讨论】:
【参考方案1】:下面一行:
loop.run_in_executor(
pool, dill.loads(dill.dumps(lambda: cpu_bound(list1, list2, int_var)))
似乎没有多大意义。您正在使用dill
序列化一个 lambda,但是在它有机会转移到子进程之前,您会立即将其反序列化回一个 lambda。这可能就是为什么您在尝试使用 dill 时从 pickle 中收到错误的原因。
但您可以通过将预期的参数作为位置参数传递给run_in_executor
,首先避免使用 lambda:
result = loop.run_in_executor(pool, cpu_bound, list1, list2, int_var)
如果列表包含可腌制对象,这应该可以正常工作。
【讨论】:
以上是关于将带有对象的函数传递给 concurrent.futures.ProcessPoolExecutor()?的主要内容,如果未能解决你的问题,请参考以下文章
将带有预定义参数的函数传递给 React 中的 handleSubmit