并发中的 Pybind11 并行处理问题::parallel_for
Posted
技术标签:
【中文标题】并发中的 Pybind11 并行处理问题::parallel_for【英文标题】:Pybind11 Parallel-Processing Issue in Concurrency::parallel_for 【发布时间】:2020-03-09 22:47:54 【问题描述】:我有一个在矩阵上执行filtering 的python 代码。我使用pybind11
创建了一个C++ 接口,它以序列化方式成功运行(请参阅下面的代码)。
我正在尝试使其并行处理,以希望与其序列化版本相比减少计算时间。为此,我将大小为M×N
的数组拆分为三个大小为M×(N/3)
的子矩阵,以使用相同的接口并行处理它们。
我使用 ppl.h
库创建了一个并行 for 循环,并在每个循环中调用大小为 M×(N/3)
的子矩阵上的 python 函数。
#include <iostream>
#include <ppl.h>
#include "pybind11/embed.h"
#include <pybind11/iostream.h>
#include <pybind11/stl_bind.h>
#include "pybind11/eigen.h"
#include "pybind11/stl.h"
#include "pybind11/numpy.h"
#include "pybind11/functional.h"
#include <Eigen/Dense>
namespace py = pybind11;
class myClass
public:
myClass()
m_module = py::module::import("myFilterScript");
m_handle = m_module.attr("medianFilter");
;
void medianFilterSerialized(Eigen::Ref<Eigen::MatrixXf> input, int windowSize)
Eigen::MatrixXf output;
output.resizeLike(input);
output = m_handle(input, windowSize).cast<Eigen::MatrixXf>();
;
void medianFilterParallelizedUsingPPL(Eigen::Ref<Eigen::MatrixXf> input, int windowSize)
Eigen::MatrixXf output;
output.resizeLike(input);
/* Acquire GIL before calling Python code */
//py::gil_scoped_acquire acquire;
Concurrency::parallel_for(size_t(0), size_t(3), [&](size_t i)
output.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3) = m_handle(input.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3).array(), windowSize).cast<Eigen::MatrixXf>();
);
//py::gil_scoped_release release;
;
private:
py::scoped_interpreter m_guard;
py::module m_module;
py::handle m_handle;
py::object m_object;
;
int main()
myClass c;
Eigen::MatrixXf input = Eigen::MatrixXf::Random(240, 120);
c.medianFilterSerialized(input, 3);
c.medianFilterParallelizedUsingPPL(input, 3);
return 0;
myFilterScript.py
:
import threading
import numpy as np
import bottleneck as bn # can be installed from https://pypi.org/project/Bottleneck/
def medianFilter(input, windowSize):
return bn.move_median(input, window=windowSize, axis=0)
不管使用py::gil_scoped_acquire
,我的代码在到达for循环时都会崩溃:
Access violation reading location // or:
Unhandled exception at 0x00007FF98BB8DB8E (ucrtbase.dll) in Pybind11_Parallelizing.exe: Fatal program exit requested.
是否有人可以帮助我了解 python 模块的加载函数是否可以以多处理或多线程方式并行调用?我的代码中缺少什么?请告诉我。提前致谢。
【问题讨论】:
【参考方案1】:py::gil_scoped_acquire
是一个 RAII 对象,用于在一个范围内获取 GIL,类似地,py::gil_scoped_release
在“反向”RAII 中用于在一个范围内释放 GIL。因此,在相关范围内,您只需要前者。
获取 GIL 的范围在调用 Python 的函数上,因此在您传递给 parallel_for
的 lambda 中:执行的每个线程都需要持有 GIL 以访问任何 Python 对象或 API,在这种情况下m_handle
。但是,在 lambda 中这样做会完全序列化代码,从而使线程的使用没有实际意义,因此它会因错误的原因解决您的问题。
这将是使用 pybind11 (https://pybind11.readthedocs.io/en/stable/advanced/embedding.html#sub-interpreter-support) 中没有直接支持的子解释器的情况,因此 C API 将是票证 (https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter)。重点是操作的数据是非Python的,所有操作原则上都是独立的。
但是,您需要知道Bottleneck
是否是线程安全的。粗略地看,它似乎没有全局/静态数据 AFAICT。理论上,还有一些并行化的空间:当调用move_median
时,你需要持有GIL,当它进入用于绑定Bottleneck
的Cython代码时(它将变量拆箱,从而调用Python API),然后Cython可以释放输入Bottleneck
的 C 代码时的 GIL 并在退出时重新获取,然后在 RAII 范围结束时在 lambda 中释放。然后 C 代码并行运行。
但是问题就变成了:为什么首先通过 Python 绑定从 C++ 调用 C 库?这里似乎是一个简单的解决方案:跳过 Python 并直接调用 move_median
C 函数。
【讨论】:
谢谢,Wim,我会尽力实现你所描述的。关于您的问题,如果有意义的话,我想创建一个接口,以方便在 python 中为已经开发的 C++ 应用程序进行原型设计。我也可以知道你的想法,而不是执行在 C++ 中启动的并行处理,而是在 python 中执行?我试过joblib
,但解释器停在Parallel(n_jobs=2, prefer='threads')(delayed(bn.move_median)(input[:,i*nCols:(i+1)*nCols], window=windowSize, axis=0) for i in range(3))
nCols=int(input.shape[1]/3)
我不熟悉joblib
,但根据文档,假设bn.move_median
在进入C 后释放GIL,上述语法将起作用。所以不,我不知道它为什么停止...您是否尝试在调试器中中断它以获取堆栈跟踪?除此之外,我唯一担心的是,我希望 input
的索引会生成一个副本,如果大小很大,这可能会出现问题(如果不是,并行化将毫无意义)。以上是关于并发中的 Pybind11 并行处理问题::parallel_for的主要内容,如果未能解决你的问题,请参考以下文章