Python:重写一个循环的 numpy 数学函数以在 GPU 上运行

Posted

技术标签:

【中文标题】Python:重写一个循环的 numpy 数学函数以在 GPU 上运行【英文标题】:Python: rewrite a looping numpy math function to run on GPU 【发布时间】:2017-06-16 21:20:35 【问题描述】:

有人可以帮我重写这个函数doTheMath 函数) 以在 GPU 上进行计算吗?我现在用了几天的好日子试图解决它,但没有结果。我想知道也许有人可以帮助我以任何你看起来适合日志的方式重写这个函数,因为我在最后给出了相同的结果。我尝试使用numba 中的@jit,但由于某种原因,它实际上比照常运行代码要慢得多。由于样本量很大,目标是大大减少执行时间,所以我自然相信 GPU 是最快的方法。

我会稍微解释一下实际发生的情况。真实数据看起来与下面代码中创建的样本数据几乎相同,被划分为每个样本大约 5.000.000 行或每个文件大约 150MB 的样本大小。总共有大约 600.000.000 行或 20GB 的数据。我必须循环遍历这些数据,逐个样本,然后在每个样本中逐行,获取每行的最后 2000 行(或另一行)并运行返回结果的 doTheMath 函数。然后将该结果保存回硬盘驱动器,在那里我可以用另一个程序用它做一些其他事情。正如您在下面看到的,我不需要所有行的所有结果,只需要那些大于特定数量的结果。如果我现在在 python 中运行我的函数,每 1.000.000 行大约需要 62 秒。考虑到所有数据以及应该以多快的速度完成,这是一个很长的时间。

我必须提到,我在data = joblib.load(file) 的帮助下将真实数据文件逐个文件上传到 RAM,因此上传数据不是问题,因为每个文件只需要大约 0.29 秒。上传后,我运行下面的整个代码。耗时最长的是doTheMath 函数。我愿意将我在 *** 上的所有 500 个声誉积分作为奖励,以奖励愿意帮助我重写这个简单代码以在 GPU 上运行的人。我的兴趣特别在于 GPU,我真的很想看看它是如何解决手头的这个问题的。

编辑/更新 1: 这是一个真实数据小样本的链接:data_csv.zip 大约 102000 行真实数据 1 和 2000 行真实数据 2a 和数据 2b。在真实样本数据上使用minimumLimit = 400

编辑/更新 2: 对于那些关注这篇文章的人,这里是以下答案的简短摘要。到目前为止,我们对原始解决方案有 4 个答案。 @Divakar 提供的只是对原始代码的调整。在这两个调整中,只有第一个实际上适用于这个问题,第二个是一个很好的调整,但不适用于这里。在其他三个答案中,其中两个是基于 CPU 的解决方案和一个 tensorflow-GPU 尝试。 Paul Panzer 的 Tensorflow-GPU 似乎很有希望,但是当我在 GPU 上实际运行它时,它比原来的要慢,所以代码仍然需要改进。

其他两个基于 CPU 的解决方案由 @PaulPanzer(纯 numpy 解决方案)和 @MSeifert(一个 numba 解决方案)提交。与原始代码相比,这两种解决方案都提供了非常好的结果,并且处理数据的速度都非常快。在这两个中,Paul Panzer 提交的一个更快。它在大约 3 秒内处理大约 1.000.000 行。唯一的问题是较小的 batchSizes,这可以通过切换到 MSeifert 提供的 numba 解决方案来克服,甚至可以通过下面讨论的所有调整后的原始代码来解决。

我非常高兴并感谢 @PaulPanzer 和 @MSeifert 为他们的答案所做的工作。尽管如此,由于这是一个关于基于 GPU 的解决方案的问题,我正在等待看看是否有人愿意尝试使用 GPU 版本,看看与当前 CPU 相比,GPU 上的数据处理速度有多快解决方案。如果没有其他答案优于@PaulPanzer 的纯 numpy 解决方案,那么我将接受他的答案作为正确的答案并获得赏金:)

编辑/更新 3: @Divakar 发布了一个带有 GPU 解决方案的新答案。经过我对真实数据的测试,速度甚至比不上 CPU 对应的解决方案。 GPU 在大约 1.5 秒内处理大约 5.000.000。这太不可思议了 :) 我对 GPU 解决方案感到非常兴奋,我感谢 @Divakar 发布它。我也感谢@PaulPanzer 和@MSeifert 提供的 CPU 解决方案 :) 现在由于 GPU 的存在,我的研究以令人难以置信的速度继续进行 :)

import pandas as pd
import numpy as np
import time

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B = tmpData1[:,1]
    C = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.


#Loop through the data
t0 = time.time()
for rowNr in  range(data1.shape[0]):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    if(tmp_df.shape[0] == batchSize):
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result])
print('Runtime:', time.time() - t0)

#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame('index':resultArray[:,0], 'result':resultArray[:,1])
resultArray.to_csv("Result Array.csv", sep=';')

我正在研究的 PC 规格:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM;
a SSD drive 
running Windows 7; 

作为一个附带问题,SLI 中的第二个显卡是否有助于解决这个问题?

【问题讨论】:

SLI 无关紧要,与 CUDA 无关。至于如何转换该代码——您可以坐在计算机前,在计算机中输入新的 CUDA 内核代码。如果你想在两个 GPU 上运行它,你还需要输入 API 代码来管理在两个 GPU 上运行代码。 您可以随时尝试numba,它可以尝试在某种程度上自动使用CUDA。更好的方法是使用 Theano/Tensorflow 的计算图并在它们的框架内实现您的算法,以便为 GPU 编译它。但是,是的,一般来说,这是关于了解 CUDA 并使用提到的 talonmies 等可用工具为其定制算法。 感谢@sascha 的建议。我认为 Theano 和 Tensorflow 仅适用于机器学习问题。我会暂时看看 numba @RaduS 不,它们是用于数学计算的通用工具。 我认为最大的改进是使用初始化的输出数组:resultArray,然后在每次迭代中索引到它进行更新,而不是从空列表开始并使用慢速append 【参考方案1】:

介绍及解决方案代码

嗯,你要求的!因此,这篇文章中列出了一个带有PyCUDA 的实现,它使用轻量级包装器在 Python 环境中扩展了 CUDA 的大部分功能。我们将使用它的SourceModule 功能,让我们在 Python 环境中编写和编译 CUDA 内核。

解决手头的问题,在涉及的计算中,我们有滑动最大值和最小值,很少有差异、划分和比较。对于涉及块最大查找(对于每个滑动窗口)的最大和最小部分,我们将使用缩减技术,如在here 中详细讨论的那样。这将在块级别完成。对于跨滑动窗口的上层迭代,我们将使用网格级别索引到 CUDA 资源。有关此块和网格格式的更多信息,请参阅page-18。 PyCUDA 还支持用于计算缩减(如 max 和 min)的内置函数,但我们失去了控制,特别是我们打算使用专用内存(如共享内存和常量内存)以接近最佳水平利用 GPU。

列出 PyCUDA-NumPy 解决方案代码 -

1] PyCUDA 部分 -

import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule

mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)

    int tid = threadIdx.x;
    int inv = TBP;
    __shared__ float dS[TBP][2];

    dS[tid][0] = d1[tid+offset];  
    dS[tid][1] = d2[tid+offset];         
    __syncthreads();

    if(tid<L-TBP)  
    
        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
    
    __syncthreads();
    inv = inv/2;

    while(inv!=0)   
    
        if(tid<inv)
        
            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
        
        __syncthreads();
        inv = inv/2;
    
    __syncthreads();

    if(tid==0)
    
        out[0] = dS[0][0];
        out[1] = dS[0][1];
       
    __syncthreads();


__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)

    int L = BLOCKLEN[0];
    int tid = threadIdx.x;
    int iterID = blockIdx.x;
    float Bmax_Cmin[2];
    int inv;
    float Cmin, dif;   
    __shared__ float dS[TBP*2];   

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);  
    Cmin = Bmax_Cmin[1];
    dif = (Bmax_Cmin[0] - Cmin);

    inv = TBP;

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
    __syncthreads();

    if(tid<L-TBP)  
        dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                   

     dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
     __syncthreads();

     if(tid<L-TBP)
         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
     __syncthreads();

    inv = inv/2;
    while(inv!=0)   
    
        if(tid<inv)
            dS[tid] += dS[tid+inv];
        __syncthreads();
        inv = inv/2;
    

    if(tid==0)
        out[iterID] = dS[0];
    __syncthreads();


""")

请注意THREADS_PER_BLOCK, TBP 是基于batchSize 设置的。这里的经验法则是将 2 的幂值分配给 TBP,它刚好小于 batchSize。因此,对于batchSize = 2000,我们需要TBP 作为1024

2] NumPy 部分 -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
    func1 = mod.get_function("main1")
    outlen = len(A)-batchSize+1

    # Set block and grid sizes
    BSZ = (1024,1,1)
    GSZ = (outlen,1)

    dest = np.zeros(outlen).astype(np.float32)
    N = np.int32(batchSize)
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \
                     drv.In(data2b), drv.In(data2a),\
                     drv.In(N), block=BSZ, grid=GSZ)
    idx = np.flatnonzero(dest >= minimumLimit)
    return idx, dest[idx]

基准测试

我已经在 GTX 960M 上进行了测试。请注意,PyCUDA 期望数组是连续的。因此,我们需要对列进行切片并制作副本。我期望/假设可以从文件中读取数据,以便数据沿行而不是列分布。因此,暂时将它们排除在基准测试功能之外。

原始方法-

def org_app(data1, batchSize, minimumLimit):
    resultArray = []
    for rowNr in  range(data1.shape[0]-batchSize+1):
        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result]) 
    return resultArray

时间和验证 -

In [2]: #Declare variables
   ...: batchSize = 2000
   ...: sampleSize = 50000
   ...: resultArray = []
   ...: minimumLimit = 490 #use 400 on the real sample data
   ...: 
   ...: #Create Random Sample Data
   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: 
   ...: # Make column copies
   ...: A = data1[:,0].copy()
   ...: B = data1[:,1].copy()
   ...: C = data1[:,2].copy()
   ...: D = data1[:,3].copy()
   ...: 
   ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
   ...: print(np.allclose(gpu_out1, cpu_out1))
   ...: print(np.allclose(gpu_out2, cpu_out2))
   ...: 
True
False

因此,CPU 和 GPU 计数之间存在一些差异。让我们调查一下——

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))

In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])

In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1.,  1.,  1.])

有四个不匹配计数的实例。 1 最大关闭。经过研究,我发现了一些关于此的信息。基本上,因为我们使用数学内在函数进行最大和最小计算,而我认为导致浮动 pt 表示中的最后一个二进制位与 CPU 对应部分不同。这称为 ULP 错误,已在 herehere 进行了详细讨论。

最后,抛开问题,让我们来看看最重要的一点,性能-

In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop

In [12]: 2180.0/82.5
Out[12]: 26.424242424242426

让我们尝试使用更大的数据集。使用sampleSize = 500000,我们得到 -

In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop

In [16]: 23200.0/821
Out[16]: 28.25822168087698

因此,加速保持在 27 左右。

限制:

1) 我们使用 float32 数字,因为 GPU 最适合这些数字。特别是在非服务器 GPU 上的双精度在性能方面并不受欢迎,因为您使用的是这样的 GPU,所以我使用 float32 进行了测试。

进一步改进:

1) 我们可以使用更快的constant memory 来输入data2adata2b,而不是使用global memory

【讨论】:

@RaduS 请务必查看已编辑的代码(刚刚编辑)以进行基准测试!现在它接受任意的batchSize @RaduS 当然,我认为今晚会这样做:) @RaduS 删除了Clarification #1 : Issue on value mis-match 部分,因为看起来问题只是错误的添加部分:) @RaduS 1,2,3, Boom!:D 啊 GPU 太神奇了!不久前我正在学习 CUDA,通过你的赏金,给了我重新学习 CUDA 的动力,非常感谢!要学的东西太多了。 @Divakar 只是顺便过来祝贺一下!我有一半想进一步调整我的,但你的太好了。【参考方案2】:

这里有一些代码来演示通过调整算法可以实现什么。它是纯粹的 numpy,但根据您发布的示例数据,其速度比原始版本大约提高了 35 倍(在我相当普通的机器上,约 2.5 秒内约 1,000,000 个样本):

>>> result_dict = master('run')
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)]
TOTAL 2.44821691513

使用的调整:

A+B+C+D,看我的其他回答 运行 min/max,包括避免使用相同的 Cmin/dif 多次计算 (A+B+C+D - 4Cmin)/(4dif)。

这些或多或少是例行公事。这留下了与 data2a/b 的比较,后者是昂贵的 O(NK),其中 N 是样本数,K 是窗口大小。 在这里,人们可以利用相对表现良好的数据。使用运行的 min/max 可以创建 data2a/b 的变体,可用于一次测试一系列窗口偏移量,如果测试失败,则可以立即排除所有这些偏移量,否则将范围一分为二。

import numpy as np
import time

# global variables; they will hold the precomputed pre-screening filters
preA, preB = , 
CHUNK_SIZES = None

def sliding_argmax(data, K=2000):
    """compute the argmax of data over a sliding window of width K

    returns:
        indices  -- indices into data
        switches -- window offsets at which the maximum changes
                    (strictly speaking: where the index of the maximum changes)
                    excludes 0 but includes maximum offset (len(data)-K+1)

    see last line of compute_pre_screening_filter for a recipe to convert
    this representation to the vector of maxima
    """
    N = len(data)
    last = np.argmax(data[:K])
    indices = [last]
    while indices[-1] <= N - 1:
        ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
        if len(ge) == 0:
            if last + K >= N:
                break
            last += 1 + np.argmax(data[last + 1 : last + K + 1])
            indices.append(last)
        else:
            last += 1 + ge[0]
            indices.append(last)
    indices = np.array(indices)
    switches = np.where(data[indices[1:]] > data[indices[:-1]],
                        indices[1:] + (1-K), indices[:-1] + 1)
    return indices, np.r_[switches, [len(data)-K+1]]


def compute_pre_screening_filter(bound, n_offs):
    """compute pre-screening filter for point-wise upper bound

    given a K-vector of upper bounds B and K+n_offs-1-vector data
    compute K+n_offs-1-vector filter such that for each index j
    if for any offset 0 <= o < n_offs and index 0 <= i < K such that
    o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j

    therefore the number of data points below filter is an upper bound for
    the maximum number of points below bound in any K-window in data
    """
    pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
    padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
    indices, switches = sliding_argmax(padded, n_offs)
    return padded[indices].repeat(np.diff(np.r_[[0], switches]))


def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
    """compute upper and lower pre-screening filters for data blocks of
    sizes K+n_offs-1 where
    n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk

    the result is stored in global variables preA and preB
    """
    global CHUNK_SIZES

    CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
    preA[1] = upper
    preB[1] = lower
    for n in CHUNK_SIZES:
        preA[n] = compute_pre_screening_filter(upper, n)
        preB[n] = -compute_pre_screening_filter(-lower, n)


def test_bounds(block, counts, threshold=400):
    """test whether the windows fitting in the data block 'block' fall
    within the bounds using pre-screening for efficient bulk rejection

    array 'counts' will be overwritten with the counts of compliant samples
    note that accurate counts will only be returned for above threshold
    windows, because the analysis of bulk rejected windows is short-circuited

    also note that bulk rejection only works for 'well behaved' data and
    for example not on random numbers
    """
    N = len(counts)
    K = len(preA[1])
    r = N % CHUNK_SIZES[0]
    # chop up N into as large as possible chunks with matching pre computed
    # filters
    # start with small and work upwards
    counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                   (block[l:l+K] >= preB[1]))
                  for l in range(r)]

    def bisect(block, counts):
        M = len(counts)
        cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
        if cnts < threshold:
            counts[:] = cnts
            return
        elif M == CHUNK_SIZES[0]:
            counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                          (block[l:l+K] >= preB[1]))
                         for l in range(M)]
        else:
            M //= 2
            bisect(block[:-M], counts[:M])
            bisect(block[M:], counts[M:])

    N = N // CHUNK_SIZES[0]
    for M in CHUNK_SIZES:
        if N % 2:
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M
        elif N == 0:
            return
        N //= 2
    else:
        for j in range(2*N):
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M


def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
            threshold=400):
    samples, upper, lower = data
    N, K = samples.shape[0], upper.shape[0]
    times = [time.time()]
    if use_pre_screening:
        compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
    times.append(time.time())
    # compute switching points of max and min for running normalisation
    upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
    lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
    times.append(time.time())
    # sum columns
    ABCD = samples.sum(axis=-1)
    times.append(time.time())
    counts = np.empty((N-K+1,), dtype=int)
    # main loop
    # loop variables:
    offs = 0
    u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
    l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
    while True:
        # check which is switching next, min(C) or max(B)
        if u_swp > l_swp:
            # greedily take the largest block possible such that dif and Cmin
            # do not change
            block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:l_swp], threshold=threshold)
            else:
                counts[offs:l_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(l_swp - offs)]
            # book keeping
            l_ind += 1
            offs = l_swp
            l_swp = lower_swp[l_ind]
            l_scale = samples[lower_inds[l_ind], 2]
        else:
            block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:u_swp], threshold=threshold)
            else:
                counts[offs:u_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(u_swp - offs)]
            u_ind += 1
            if u_ind == len(upper_inds):
                assert u_swp == N-K+1
                break
            offs = u_swp
            u_swp = upper_swp[u_ind]
            u_scale = samples[upper_inds[u_ind], 1]
    times.append(time.time())
    return 'counts': counts, 'valid': np.where(counts >= 400)[0],
            'timings': np.diff(times)


def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3,
           min_chnk=None, dyads=None):
    t = time.time()
    if data in ('fake', 'load'):
        data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1,
                           usecols=[1,2,3,4])
        data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        if data == 'fake':
            data1 = np.tile(data1, (10, 1))
        threshold = 400
    elif data == 'random':
        data1 = np.random.random((102000, 4))
        data2b = np.random.random(2000)
        data2a = np.random.random(2000)
        threshold = 490
        if use_pre_screening or mode == 'calibrate':
            print('WARNING: pre-screening not efficient on artificial data')
    else:
        raise ValueError("data mode  not recognised".format(data))
    data = data1, data2a, data2b
    t_load = time.time() - t
    if mode == 'calibrate':
        min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
        dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
        timings = np.zeros((len(min_chnk), len(dyads)))
        print('max bisect  ' + ' '.join([
            '   n.a.' if dy == 0 else ':7d'.format(dy) for dy in dyads]),
              end='')
        for i, mc in enumerate(min_chnk):
            print('\nmin chunk '.format(mc), end=' ')
            for j, dy in enumerate(dyads):
                for k in range(nrep):
                    if dy == 0: # no pre-screening
                        timings[i, j] += analyse(
                            data, False, mc, dy, threshold)['timings'][3]
                    else:
                        timings[i, j] += analyse(
                            data, True, mc, dy, threshold)['timings'][3]
                timings[i, j] /= nrep
                print(':7.3f'.format(timings[i, j]), end=' ', flush=True)
        best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
                                            timings.shape)
        print('\nbest', min_chnk[best_mc], dyads[best_dy])
        return timings, min_chnk[best_mc], dyads[best_dy]
    if mode == 'run':
        min_chnk = 2 if min_chnk is None else min_chnk
        dyads = 5 if dyads is None else dyads
        res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
        times = np.r_[[t_load], res['timings']]
        print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times)))
        print('TOTAL', times.sum())
        return res

【讨论】:

哇,这真是令人印象深刻的结果,我喜欢你的方法。我看到 res_indices 返回高于阈值的所有索引的列表。是如何在 where 之后获得同一数组中每个索引的结果编号? 您可以直接在 out out[res_indices] 上使用 res_indices 为您提供在每个偏移量为 400 或更多时满足您标准的点数。您能否在更多数据上测试脚本?我根据您发布的示例对其进行了调整,但我很想知道它是否也适用于其他示例。 我现在在真实数据上测试了很多你的脚本,速度惊人,准确性100%正确。每 1 百万行我得到大约 3 秒。考虑到它仅在 CPU 上运行这一事实,这确实令人印象深刻。我对脚本的结果感到非常高兴,尽管我有点难以理解所有内容:) 考虑一下它是有道理的,因为主要的节省之一是利用了滑动最大值不会经常变化的事实。现在,您制作的窗口越小,这就越不真实,因此当您的积蓄消失时,您仍然会被所有这些棘手代码的开销所困扰。如果您要使用非常小的窗口,其他策略可能会执行得更好...... 我忍不住多修修补补。新代码修复了两个小错误并有一个新的sliding_argmax,在我的标准 1,000,000 百万样本测试中,它在我的平台上又缩短了半秒。所以我们减少到 2.5 秒,其中 0.8 秒用于加载数据!【参考方案3】:

在开始调整目标 (GPU) 或使用其他任何东西(即并行执行)之前,您可能需要考虑如何改进现有代码。您使用了numba-tag,所以我将使用它来改进代码:首先我们对数组而不是矩阵进行操作:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

每次调用doTheMath 时,您都希望返回一个整数,但是您使用了很多数组并创建了很多中间数组:

abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

这会在每一步创建一个中间数组:

tmp1 = A-Cmin, tmp2 = tmp1 / dif, tmp3 = B - Cmin, tmp4 = tmp3 / dif ...你明白要点了。

然而这是一个reduce函数(数组->整数)所以有很多中间数组是不必要的权重,只需计算“fly”的值。

import numba as nb

@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
    Bmax = np.max(tmpData[:, 1])
    Cmin = np.min(tmpData[:, 2])
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    sum_ = 0
    for i in range(tmpData.shape[0]):
        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

为了避免多次操作,我在这里做了其他事情:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)

这实际上将我的计算机上的执行时间缩短了近 10 倍:

%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop

当然还有其他改进,例如使用滚动最小值/最大值来计算 BmaxCmin,这将使至少部分计算在 O(sampleSize) 而不是 O(samplesize * batchsize) 中运行。这也使得重用一些(A + B + C + D) / (4 * dif) - (Cmin / dif) 计算成为可能,因为如果CminBmax 不改变下一个样本,这些值不会不同。这有点复杂,因为比较不同。但绝对有可能!见这里:

import time
import numpy as np
import numba as nb

@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    quarter_idiff = 0.25 * idiff
    sum_ = 0
    for i in range(abcd.shape[0]):
        val = abcd[i] * quarter_idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
    found = 0
    for rowNr in range(data1.shape[0]):
        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
                                    data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
            if (result >= minimumLimit):
                resultArray[found, 0] = rowNr
                resultArray[found, 1] = result
                found += 1
    return resultArray[:found]

#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))  # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))

result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print('Runtime:', time.time() - t0)

这给了我一个Runtime: 0.759593152999878(在 numba 编译函数之后!),而你原来的有Runtime: 24.68975639343262。现在我们的速度提高了 30 倍!

根据您的样本量,它仍然需要Runtime: 60.187848806381226,但这还不错,对吧?

即使我自己没有这样做,numba 表示可以写成"Numba for CUDA GPUs",而且看起来并不复杂。

【讨论】:

"这也可以重用一些 (A + B + C + D) / (4 * dif) - (Cmin / dif) 计算,因为如果 Cmin 和 Bmax 不为下一个样本更改这些值并没有什么不同。这有点复杂......“完成了,将在几分钟内发布。很快,我使用的是纯 numpy。 好的,我必须更正我之前的陈述,因为我做错了什么,它只快了 30 倍:( @PaulPanzer 是的,可以再次实现所有这些功能(而不是使用 scipy 过滤器),但我认为你的 numpy 代码非常复杂,而且在我的计算机上也更慢(不多,但几乎慢 2 倍)。所以我不认为在这里“使用纯 numpy”是一个优势。此外:Numba 实际上可以为 GPU 编译代码,即使我自己没有这样做。 :) 您使用的是实际数据还是只是随机数?这里有很大的不同(x2 - x3)。顺便提一句。我对 numpy 或 numba 之类的东西并不虔诚,我只是指出,我看到的 30 倍主要归功于改进的算法,如果您可以使用 numba 或其他东西获得更多信息,那就更好了。哦,scipy 实际上有一个滑动 argmax 甚至一个滑动最大值吗? 嗨@MSeifert 感谢您提交答案。现在我在相当大量的真实数据上测试了你的方法。结果确实更快,没有我预期的 numba 快,但它们比我原来的版本快。有一个问题。结果的准确度完全为0%。一定是计算有问题,嗯,我明天看看能不能找到问题【参考方案4】:

这在技术上是题外话(不是 GPU),但我相信你会感兴趣的。

有一个明显且相当大的节省:

预计算A + B + C + D(不在循环中,在整个数据上:data1.sum(axis=-1)),因为abcd = ((A+B+C+D) - 4Cmin) / (4dif)。这将节省相当多的操作。

很惊讶之前没有人发现那个 ;-)

编辑:

还有一件事,虽然我怀疑这只是在你的例子中,而不是在你的真实数据中:

目前data2a 的大约一半将小于data2b。在这些地方,您对abcd 的条件不能同时为真,因此您甚至不需要在那里计算abcd

编辑:

我在下面使用的另一个调整但忘了提及:如果您计算移动窗口上的最大值(或最小值)。例如,当您向右移动一点时,最大值发生变化的可能性有多大?只有两件事可以改变它:右边的新点更大(大约在 windowlength 时间内发生一次,即使发生了,您也会立即知道新的最大值)或旧的最大值从窗口中掉下来 在左侧(在 windowlength 时间内也大约发生一次)。只有在最后一种情况下,您才需要在整个窗口中搜索新的最大值。

编辑:

忍不住在 tensorflow 中尝试一下。我没有 GPU,所以你必须自己测试它的速度。将“cpu”的“gpu”放在标记的行上。

在 cpu 上,它的速度大约是原始实现的一半(即没有 Divakar 的调整)。请注意,我冒昧地将输入从矩阵更改为普通数组。目前 tensorflow 是一个移动的目标,因此请确保您拥有正确的版本。我使用 Python3.6 和 tf 0.12.1 如果您今天执行 pip3 install tensorflow-gpu 它应该可能会工作。

import numpy as np
import time
import tensorflow as tf

# currently the max/min code is sequential
# thus
parallel_iterations = 1
# but you can put this in a separate loop, precompute and then try and run
# the remainder of doTheMathTF with a larger parallel_iterations

# tensorflow is quite capricious about its data types
ddf = tf.float64
ddi = tf.int32

def worker(data1, data2a, data2b):
    ###################################
    # CHANGE cpu to gpu in next line! #
    ###################################
    with tf.device('/cpu:0'):
        g = tf.Graph ()
        with g.as_default():
            ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf)
            B = tf.constant(data1[:, 1], dtype=ddf)
            C = tf.constant(data1[:, 2], dtype=ddf)
            window = tf.constant(len(data2a))
            N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi)
            data2a = tf.constant(data2a, dtype=ddf)
            data2b = tf.constant(data2b, dtype=ddf)
            def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out):
                # most of the time we can keep the old max/min
                Bmaxind = tf.cond(Bmaxind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmax(B[i:i+window], axis=0)),
                                  lambda: tf.cond(Bmax>B[i+window-1], 
                                                  lambda: Bmaxind, 
                                                  lambda: i+window-1))
                Cminind = tf.cond(Cminind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmin(C[i:i+window], axis=0)),
                                  lambda: tf.cond(Cmin<C[i+window-1],
                                                  lambda: Cminind,
                                                  lambda: i+window-1))
                Bmax = B[Bmaxind]
                Cmin = C[Cminind]
                abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin)))
                out = out.write(i, tf.to_int32(
                    tf.count_nonzero(tf.logical_and(abcd <= data2a,
                                                    abcd >= data2b))))
                return i + 1, Bmax, Bmaxind, Cmin, Cminind, out
            with tf.Session(graph=g) as sess:
                i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
                    lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF,
                    (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf),
                     tf.Variable(-1, dtype=ddi),
                     tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi),
                     tf.TensorArray(ddi, size=N)),
                    shape_invariants=None,
                    parallel_iterations=parallel_iterations,
                    back_prop=False)
                out = out.pack()
                sess.run(tf.initialize_all_variables())
                out, = sess.run((out,))
    return out

#Declare variables
batchSize = 2000
sampleSize = 50000#00
resultArray = []

#Create Sample Data
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4))
data2a = np.random.uniform(0, 1, (batchSize,))
data2b = np.random.uniform(0, 1, (batchSize,))

t0 = time.time()
out = worker(data1, data2a, data2b)
print('Runtime (tensorflow):', time.time() - t0)


good_indices, = np.where(out >= 490)
res_tf = np.c_[good_indices, out[good_indices]]

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B  = tmpData1[:,1]
    C   = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Loop through the data
t0 = time.time()
for rowNr in  range(sampleSize+1):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    result = doTheMath(tmp_df, data2a, data2b)
    if (result >= 490):
        resultArray.append([rowNr , result])
print('Runtime (original):', time.time() - t0)
print(np.alltrue(np.array(resultArray)==res_tf))

【讨论】:

感谢保罗的回答。我在两台安装了 Windows 的独立计算机上测试了代码,Python3.5 和 tf 0.12.1。出于某种原因,tensorflow 版本比原始版本慢,即使我激活 GPU,它仍然比原始版本慢。以下是一些统计数据: Pc1 未安装 GPU:Runtime (tensorflow): 9.321721315383911 Runtime (original): 3.7472479343414307 True 已安装并激活 GPU 的 Pc2:Runtime (tensorflow): 72.36511659622192 Runtime (original): 3.5680108070373535 True 我也收到警告'WARNING:tensorflow:From C:/testings.py:61 in worker.: initialize_all_variables (from tensorflow.python.ops.variables) is deprecated and will be removed after 2017-03-02. Instructions for updating: Use tf.global_variables_initializer instead.' 这只是对您发送的代码的测试,没有更改数据或样本大小。它会因为是 Windows 而变慢吗?还是因为我有 python 3.5 而不是 3.6?还是有其他原因? @RaduS 恐怕在涉及到 tensorflow 时,我正在涉足自己。据我所知,分析和调试是一场噩梦。让我们等几天。也许一些 tf buff 会拿起线程。如果没有,我可以再看看。你可以试试thisrecipy 来了解是什么让它这么慢。抱歉,我现在无法提供更多帮助。 感谢@PaulPanzer 试一试。顺便说一句,我在问题编辑中上传了一个示例数据,如果您想对其进行测试【参考方案5】:

调整 #1

通常建议在使用 NumPy 数组时对事物进行矢量化。但是对于非常大的数组,我认为你没有选择。因此,为了提高性能,可以对求和的最后一步进行微调。

我们可以替换生成1s0s 数组并求和的步骤:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

使用np.count_nonzero 可以有效地计算布尔数组中的True 值,而不是转换为1s0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

运行时测试-

In [45]: abcd = np.random.randint(11,99,(10000))

In [46]: data2a = np.random.randint(11,99,(10000))

In [47]: data2b = np.random.randint(11,99,(10000))

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop

调整 #2

在处理隐式广播的情况时使用预先计算的倒数。更多信息here。因此,存储dif 的倒数并在步骤中使用它:

((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...

样本测试-

In [52]: A = np.random.rand(10000)

In [53]: dif = 0.5

In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop

In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop

您有四个地方使用除以dif。所以,希望这也能带来显着的提升!

【讨论】:

嗨@Divakar,关于tweak#2,我阅读了您链接的帖子并尝试实施它。但它接缝我没有得到相同的结果。也许我做错了什么。你能看看吗?也许你更容易发现错误dif = 1.0 /(Bmax - Cmin)然后abcd = ((dif * A) + ((dif * B) + (dif*C) + (dif*D)) / 4) @RaduS 好吧,如果BmaxCmin 接近,那么Bmax - Cmin 将是一个小数,而它的倒数将是一个大数。因此,稍后当数组乘以该数字时,我们将有不同的数字。所以,我们可能会跳过那个调整。

以上是关于Python:重写一个循环的 numpy 数学函数以在 GPU 上运行的主要内容,如果未能解决你的问题,请参考以下文章

Numpy

numpy

Python机器学习(四十四)NumPy 数学函数

Python-NumPy

python---Numpy模块中线性代数运算,统计和数学函数

使用 CUDA 在 python 中展开一个可并行化的 for 循环