使用 Cython 加速连接组件算法
Posted
技术标签:
【中文标题】使用 Cython 加速连接组件算法【英文标题】:Using Cython to speed up connected components algorithm 【发布时间】:2012-08-20 03:55:45 【问题描述】:首先,我在 windows xp 机器上使用 python[2.7.2]、numpy[1.6.2rc1]、cython[0.16]、gcc[MinGW] 编译器。
我需要一个 3D 连接组件算法来处理存储在 numpy 数组中的一些 3D 二进制数据(即 1 和 0)。不幸的是,我找不到任何现有代码,因此我修改了找到的代码 here 以使用 3D 数组。一切都很好,但是速度对于处理庞大的数据集是可取的。结果我偶然发现了 cython,并决定试一试。
到目前为止,cython 已经提高了速度: 赛通:0.339 秒 蟒蛇:0.635 秒
使用cProfile,我在纯python版本中的耗时行是:
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel()))
问题:“cythonize”行的正确方法是什么:
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel()))
for x,y,z in zip(ind[0],ind[1],ind[2]):
任何帮助将不胜感激,希望这项工作对其他人有所帮助。
纯python版本[*.py]:
import numpy as np
def find_regions_3D(Array):
x_dim=np.size(Array,0)
y_dim=np.size(Array,1)
z_dim=np.size(Array,2)
regions =
array_region = np.zeros((x_dim,y_dim,z_dim),)
equivalences =
n_regions = 0
#first pass. find regions.
ind=np.where(Array==1)
for x,y,z in zip(ind[0],ind[1],ind[2]):
# get the region number from all surrounding cells including diagnols (27) or create new region
xMin=max(x-1,0)
xMax=min(x+1,x_dim-1)
yMin=max(y-1,0)
yMax=min(y+1,y_dim-1)
zMin=max(z-1,0)
zMax=min(z+1,z_dim-1)
max_region=array_region[xMin:xMax+1,yMin:yMax+1,zMin:zMax+1].max()
if max_region > 0:
#a neighbour already has a region, new region is the smallest > 0
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax+1,yMin:yMax+1,zMin:zMax+1].ravel()))
#update equivalences
if max_region > new_region:
if max_region in equivalences:
equivalences[max_region].add(new_region)
else:
equivalences[max_region] = set((new_region, ))
else:
n_regions += 1
new_region = n_regions
array_region[x,y,z] = new_region
#Scan Array again, assigning all equivalent regions the same region value.
for x,y,z in zip(ind[0],ind[1],ind[2]):
r = array_region[x,y,z]
while r in equivalences:
r= min(equivalences[r])
array_region[x,y,z]=r
#return list(regions.itervalues())
return array_region
纯 python 加速:
#Original line:
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax+1,yMin:yMax+1,zMin:zMax+1].ravel()))
#ver A:
new_region = array_region[xMin:xMax+1,yMin:yMax+1,zMin:zMax+1]
min(new_region[new_region>0])
#ver B:
new_region = min( i for i in array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel() if i>0)
#ver C:
sub=array_region[xMin:xMax,yMin:yMax,zMin:zMax]
nlist=np.where(sub>0)
minList=[]
for x,y,z in zip(nlist[0],nlist[1],nlist[2]):
minList.append(sub[x,y,z])
new_region=min(minList)
时间结果: O: 0.0220445 答:0.0002161 乙:0.0173195 C: 0.0002560
Cython 版本 [*.pyx]:
import numpy as np
cimport numpy as np
DTYPE = np.int
ctypedef np.int_t DTYPE_t
cdef inline int int_max(int a, int b): return a if a >= b else b
cdef inline int int_min(int a, int b): return a if a <= b else b
def find_regions_3D(np.ndarray Array not None):
cdef int x_dim=np.size(Array,0)
cdef int y_dim=np.size(Array,1)
cdef int z_dim=np.size(Array,2)
regions =
cdef np.ndarray array_region = np.zeros((x_dim,y_dim,z_dim),dtype=DTYPE)
equivalences =
cdef int n_regions = 0
#first pass. find regions.
ind=np.where(Array==1)
cdef int xMin, xMax, yMin, yMax, zMin, zMax, max_region, new_region, x, y, z
for x,y,z in zip(ind[0],ind[1],ind[2]):
# get the region number from all surrounding cells including diagnols (27) or create new region
xMin=int_max(x-1,0)
xMax=int_min(x+1,x_dim-1)+1
yMin=int_max(y-1,0)
yMax=int_min(y+1,y_dim-1)+1
zMin=int_max(z-1,0)
zMax=int_min(z+1,z_dim-1)+1
max_region=array_region[xMin:xMax,yMin:yMax,zMin:zMax].max()
if max_region > 0:
#a neighbour already has a region, new region is the smallest > 0
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel()))
#update equivalences
if max_region > new_region:
if max_region in equivalences:
equivalences[max_region].add(new_region)
else:
equivalences[max_region] = set((new_region, ))
else:
n_regions += 1
new_region = n_regions
array_region[x,y,z] = new_region
#Scan Array again, assigning all equivalent regions the same region value.
cdef int r
for x,y,z in zip(ind[0],ind[1],ind[2]):
r = array_region[x,y,z]
while r in equivalences:
r= min(equivalences[r])
array_region[x,y,z]=r
#return list(regions.itervalues())
return array_region
Cython 加速:
使用:
cdef np.ndarray region = np.zeros((3,3,3),dtype=DTYPE)
...
region=array_region[xMin:xMax,yMin:yMax,zMin:zMax]
new_region=np.min(region[region>0])
时间:0.170,原始:0.339 s
结果
在考虑了许多有用的 cmets 和提供的答案后,我当前的算法运行在: 赛通:0.0219 蟒蛇:0.4309
Cython 的速度比纯 python 提高了 20 倍。
当前 Cython 代码:
import numpy as np
import cython
cimport numpy as np
cimport cython
from libcpp.map cimport map
DTYPE = np.int
ctypedef np.int_t DTYPE_t
cdef inline int int_max(int a, int b): return a if a >= b else b
cdef inline int int_min(int a, int b): return a if a <= b else b
@cython.boundscheck(False)
def find_regions_3D(np.ndarray[DTYPE_t,ndim=3] Array not None):
cdef unsigned int x_dim=np.size(Array,0),y_dim=np.size(Array,1),z_dim=np.size(Array,2)
regions =
cdef np.ndarray[DTYPE_t,ndim=3] array_region = np.zeros((x_dim,y_dim,z_dim),dtype=DTYPE)
cdef np.ndarray region = np.zeros((3,3,3),dtype=DTYPE)
cdef map[int,int] equivalences
cdef unsigned int n_regions = 0
#first pass. find regions.
ind=np.where(Array==1)
cdef np.ndarray[DTYPE_t,ndim=1] ind_x = ind[0], ind_y = ind[1], ind_z = ind[2]
cells=range(len(ind_x))
cdef unsigned int xMin, xMax, yMin, yMax, zMin, zMax, max_region, new_region, x, y, z, i, xi, yi, zi, val
for i in cells:
x=ind_x[i]
y=ind_y[i]
z=ind_z[i]
# get the region number from all surrounding cells including diagnols (27) or create new region
xMin=int_max(x-1,0)
xMax=int_min(x+1,x_dim-1)+1
yMin=int_max(y-1,0)
yMax=int_min(y+1,y_dim-1)+1
zMin=int_max(z-1,0)
zMax=int_min(z+1,z_dim-1)+1
max_region = 0
new_region = 2000000000 # huge number
for xi in range(xMin, xMax):
for yi in range(yMin, yMax):
for zi in range(zMin, zMax):
val = array_region[xi,yi,zi]
if val > max_region: # val is the new maximum
max_region = val
if 0 < val < new_region: # val is the new minimum
new_region = val
if max_region > 0:
if max_region > new_region:
if equivalences.count(max_region) == 0 or new_region < equivalences[max_region]:
equivalences[max_region] = new_region
else:
n_regions += 1
new_region = n_regions
array_region[x,y,z] = new_region
#Scan Array again, assigning all equivalent regions the same region value.
cdef int r
for i in cells:
x=ind_x[i]
y=ind_y[i]
z=ind_z[i]
r = array_region[x,y,z]
while equivalences.count(r) > 0:
r= equivalences[r]
array_region[x,y,z]=r
return array_region
设置文件 [setup.py]
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy
setup(
cmdclass = 'build_ext': build_ext,
ext_modules = [Extension("ConnectComp", ["ConnectedComponents.pyx"],
include_dirs =[numpy.get_include()],
language="c++",
)]
)
构建命令:
python setup.py build_ext --inplace
【问题讨论】:
您是否考虑过使用networkx
或graphtool
来执行此操作?它们都具有连接组件算法,并且它们的正确性已经过充分测试。 networkx
设置起来也很简单。
如果您使用,我希望 python 版本(稍微)更快:new_region = min( i for i in array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel() if i>0)
或者,也许更好:region = array_region[...]; np.min(region[region>0])
。 Cython 甚至可以有效地将其翻译成 C :)
另外,如果可能的话,您应该尝试使用more efficient indexing by typing the array。
2D algorithm in Cython
【参考方案1】:
正如@gotgenes 指出的那样,您绝对应该使用cython -a <file>
,并尝试减少您看到的黄色量。黄色对应生成的 C 越来越差。
我发现减少黄色量的事情:
这看起来像是永远不会有任何越界数组访问的情况,只要输入 Array
具有 3 个维度,因此可以关闭边界检查:
cimport cython
@cython.boundscheck(False)
def find_regions_3d(...):
为编译器提供更多关于efficient indexing 的信息,即每当你cdef
和ndarray
提供尽可能多的信息时:
def find_regions_3D(np.ndarray[DTYPE_t,ndim=3] Array not None):
[...]
cdef np.ndarray[DTYPE_t,ndim=3] array_region = ...
[etc.]
为编译器提供有关正面/负面的更多信息。 IE。如果您知道某个变量总是为正数,请将 cdef
设为 unsigned int
而不是 int
,因为这意味着 Cython 可以消除任何负索引检查。
立即解压ind
元组,即
ind = np.where(Array==1)
cdef np.ndarray[DTYPE_t,ndim=1] ind_x = ind[0], ind_y = ind[1], ind_z = ind[2]
避免使用for x,y,z in zip(..[0],..[1],..[2])
构造。在这两种情况下,将其替换为
cdef int i
for i in range(len(ind_x)):
x = ind_x[i]
y = ind_y[i]
z = ind_z[i]
避免进行花哨的索引/切片。尤其是避免做两次!并避免使用filter
! IE。替换
max_region=array_region[xMin:xMax,yMin:yMax,zMin:zMax].max()
if max_region > 0:
new_region = min(filter(lambda i: i > 0, array_region[xMin:xMax,yMin:yMax,zMin:zMax].ravel()))
if max_region > new_region:
if max_region in equivalences:
equivalences[max_region].add(new_region)
else:
equivalences[max_region] = set((new_region, ))
更详细的
max_region = 0
new_region = 2000000000 # "infinity"
for xi in range(xMin, xMax):
for yi in range(yMin, yMax):
for zi in range(zMin, zMax):
val = array_region[xi,yi,zi]
if val > max_region: # val is the new maximum
max_region = val
if 0 < val < new_region: # val is the new minimum
new_region = val
if max_region > 0:
if max_region > new_region:
if max_region in equivalences:
equivalences[max_region].add(new_region)
else:
equivalences[max_region] = set((new_region, ))
else:
n_regions += 1
new_region = n_regions
这看起来不太好,但是三重循环编译到大约 10 行左右的 C 代码,而原始的编译版本有数百行长,并且有很多 Python 对象操作。
(显然你必须cdef
所有你使用的变量,尤其是xi
、yi
、zi
和val
在这段代码中。)
您不需要存储所有等价物,因为您对集合所做的唯一事情就是找到最小元素。因此,如果您将equivalences
映射为int
到int
,则可以替换
if max_region in equivalences:
equivalences[max_region].add(new_region)
else:
equivalences[max_region] = set((new_region, ))
[...]
while r in equivalences:
r = min(equivalences[r])
与
if max_region not in equivalences or new_region < equivalences[max_region]:
equivalences[max_region] = new_region
[...]
while r in equivalences:
r = equivalences[r]
最后要做的就是完全不使用任何 Python 对象,特别是不要为 equivalences
使用字典。现在这很容易,因为它将int
映射到int
,所以可以使用from libcpp.map cimport map
,然后使用cdef map[int,int] equivalences
,并将.. not in equivalences
替换为equivalences.count(..) == 0
,将.. in equivalences
替换为equivalences.count(..) > 0
。 (请注意,它需要 C++ 编译器。)
【讨论】:
感谢所有建议!对此,我真的非常感激。我会尝试合并所有这些。 @Onlyjus,如果您已经尝试了所有方法(并且有效),您可能应该只接受答案:) ...其他人很容易给出更好的答案! 我仍在努力,但在遵循您的大部分建议后,我的速度比纯 python 提高了 20 倍。 @Onlyjus,我猜它只删除了几个if
语句,所以速度差异是无法察觉的。【参考方案2】:
(抄自上述评论,方便他人阅读)
我相信 scipy 的 ndimage.label 可以满足您的要求(我没有针对您的代码对其进行测试,但它应该非常有效)。请注意,您必须明确导入它:
from scipy import ndimage
ndimage.label(your_data, connectivity_struct)
然后您可以应用其他内置函数(例如查找边界矩形、质心等)
【讨论】:
【参考方案3】:在针对 cython 进行优化时,您要确保在循环中主要使用原生 C 数据类型,而不是开销较高的 Python 对象。查找此类位置的最佳方法是查看生成的 C 代码并查找已转换为大量 Py* 函数调用的行。这些地方通常可以通过使用 cdef 变量而不是 python 对象来优化。
例如,在您的代码中,我怀疑带有zip
的循环会产生大量python 对象,使用int
索引进行迭代会更快,然后用于获取ind[0]
中的元素, .... 但是看看生成的 C 代码,看看什么似乎调用了许多不必要的 python 函数。
【讨论】:
我建议只使用cython -a <pyxfile>
并检查生成的 html 文件以查看 Cython 认为在查看 C 代码之前首先使用了许多 Python 对象的位置。不过,也许这就是你的意思?以上是关于使用 Cython 加速连接组件算法的主要内容,如果未能解决你的问题,请参考以下文章
Cython 似乎通过减少时间分析器而不是核心代码的开销来提供加速?
Pycharm 在 docker 托管应用程序上运行“使用 cython 的调试器加速”
使用cython库对python代码进行动态编译达到加速效果