并发中的 Pybind11 并行处理问题::parallel_for

Posted

技术标签:

【中文标题】并发中的 Pybind11 并行处理问题::parallel_for【英文标题】:Pybind11 Parallel-Processing Issue in Concurrency::parallel_for 【发布时间】:2020-03-09 22:47:54 【问题描述】:

我有一个在矩阵上执行filtering 的python 代码。我使用pybind11 创建了一个C++ 接口,它以序列化方式成功运行(请参阅下面的代码)。

我正在尝试使其并行处理,以希望与其序列化版本相比减少计算时间。为此,我将大小为M×N 的数组拆分为三个大小为M×(N/3) 的子矩阵,以使用相同的接口并行处理它们。

我使用 ppl.h 库创建了一个并行 for 循环,并在每个循环中调用大小为 M×(N/3) 的子矩阵上的 python 函数。

#include <iostream>
#include <ppl.h>

#include "pybind11/embed.h"
#include <pybind11/iostream.h>
#include <pybind11/stl_bind.h>
#include "pybind11/eigen.h"
#include "pybind11/stl.h"
#include "pybind11/numpy.h"
#include "pybind11/functional.h"
#include <Eigen/Dense>

namespace py = pybind11;

class myClass

public:
    myClass()
    
        m_module = py::module::import("myFilterScript");
        m_handle = m_module.attr("medianFilter");
    ;

    void medianFilterSerialized(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
    
        Eigen::MatrixXf output;
        output.resizeLike(input);
        output = m_handle(input, windowSize).cast<Eigen::MatrixXf>();
    ;

    void medianFilterParallelizedUsingPPL(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
    
        Eigen::MatrixXf output;
        output.resizeLike(input);
        /* Acquire GIL before calling Python code */
        //py::gil_scoped_acquire acquire;
        Concurrency::parallel_for(size_t(0), size_t(3), [&](size_t i)
        
            output.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3) = m_handle(input.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3).array(), windowSize).cast<Eigen::MatrixXf>();
        );
        //py::gil_scoped_release release;
    ;

private:
    py::scoped_interpreter m_guard;
    py::module m_module;
    py::handle m_handle;
    py::object m_object;
;


int main()

    myClass c;

    Eigen::MatrixXf input = Eigen::MatrixXf::Random(240, 120);

    c.medianFilterSerialized(input, 3); 
    c.medianFilterParallelizedUsingPPL(input, 3);

    return 0;

myFilterScript.py:

import threading
import numpy as np
import bottleneck as bn # can be installed from https://pypi.org/project/Bottleneck/

def medianFilter(input, windowSize):
    return bn.move_median(input, window=windowSize, axis=0)

不管使用py::gil_scoped_acquire,我的代码在到达for循环时都会崩溃:

Access violation reading location // or:
Unhandled exception at 0x00007FF98BB8DB8E (ucrtbase.dll) in Pybind11_Parallelizing.exe: Fatal program exit requested.

是否有人可以帮助我了解 python 模块的加载函数是否可以以多处理或多线程方式并行调用?我的代码中缺少什么?请告诉我。提前致谢。

【问题讨论】:

【参考方案1】:

py::gil_scoped_acquire 是一个 RAII 对象,用于在一个范围内获取 GIL,类似地,py::gil_scoped_release 在“反向”RAII 中用于在一个范围内释放 GIL。因此,在相关范围内,您只需要前者。

获取 GIL 的范围在调用 Python 的函数上,因此在您传递给 parallel_for 的 lambda 中:执行的每个线程都需要持有 GIL 以访问任何 Python 对象或 API,在这种情况下m_handle。但是,在 lambda 中这样做会完全序列化代码,从而使线程的使用没有实际意义,因此它会因错误的原因解决您的问题。

这将是使用 pybind11 (https://pybind11.readthedocs.io/en/stable/advanced/embedding.html#sub-interpreter-support) 中没有直接支持的子解释器的情况,因此 C API 将是票证 (https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter)。重点是操作的数据是非Python的,所有操作原则上都是独立的。

但是,您需要知道Bottleneck 是否是线程安全的。粗略地看,它似乎没有全局/静态数据 AFAICT。理论上,还有一些并行化的空间:当调用move_median时,你需要持有GIL,当它进入用于绑定Bottleneck的Cython代码时(它将变量拆箱,从而调用Python API),然后Cython可以释放输入Bottleneck 的 C 代码时的 GIL 并在退出时重新获取,然后在 RAII 范围结束时在 lambda 中释放。然后 C 代码并行运行。

但是问题就变成了:为什么首先通过 Python 绑定从 C++ 调用 C 库?这里似乎是一个简单的解决方案:跳过 Python 并直接调用 move_median C 函数。

【讨论】:

谢谢,Wim,我会尽力实现你所描述的。关于您的问题,如果有意义的话,我想创建一个接口,以方便在 python 中为已经开发的 C++ 应用程序进行原型设计。我也可以知道你的想法,而不是执行在 C++ 中启动的并行处理,而是在 python 中执行?我试过joblib,但解释器停在Parallel(n_jobs=2, prefer='threads')(delayed(bn.move_median)(input[:,i*nCols:(i+1)*nCols], window=windowSize, axis=0) for i in range(3))nCols=int(input.shape[1]/3) 我不熟悉joblib,但根据文档,假设bn.move_median 在进入C 后释放GIL,上述语法将起作用。所以不,我不知道它为什么停止...您是否尝试在调试器中中断它以获取堆栈跟踪?除此之外,我唯一担心的是,我希望 input 的索引会生成一个副本,如果大小很大,这可能会出现问题(如果不是,并行化将毫无意义)。

以上是关于并发中的 Pybind11 并行处理问题::parallel_for的主要内容,如果未能解决你的问题,请参考以下文章

使用pybind11开发python扩展库

等效于 pybind11 中的 boost::python py::scope().attr()

在 pybind11 中引用 C++ 分配的对象

pybind11 保持对象活着

《数据库系统概论》 -- 11.并发机制

数据库 chapter 11 并发控制