沿轴插入 numpy.ndarray 的最佳方法

Posted

技术标签:

【中文标题】沿轴插入 numpy.ndarray 的最佳方法【英文标题】:Best way to interpolate a numpy.ndarray along an axis 【发布时间】:2015-05-10 04:22:00 【问题描述】:

我在numpy.ndarray 中有 4 维数据,比如温度。 数组的形状是(ntime, nheight_in, nlat, nlon)

每个维度都有对应的一维数组,告诉我某个值对应于哪个时间、高度、纬度和经度,对于这个例子,我需要height_in 给出以米为单位的高度。

现在我需要把它放到一个不同的高度维度上,height_out,具有不同的长度。

以下似乎可以满足我的要求:

ntime, nheight_in, nlat, nlon = t_in.shape

nheight_out = len(height_out)
t_out = np.empty((ntime, nheight_out, nlat, nlon))

for time in range(ntime):
    for lat in range(nlat):
        for lon in range(nlon):
            t_out[time, :, lat, lon] = np.interp(
                height_out, height_in, t[time, :, lat, lon]
            )

但是有 3 个嵌套循环,并且在 python 和 numpy 之间切换很多,我认为这不是最好的方法。

关于如何改进这一点有什么建议吗?谢谢

【问题讨论】:

【参考方案1】:

scipyinterp1d可以帮忙:

import numpy as np
from scipy.interpolate import interp1d

ntime, nheight_in, nlat, nlon = (10, 20, 30, 40)

heights = np.linspace(0, 1, nheight_in)

t_in = np.random.normal(size=(ntime, nheight_in, nlat, nlon))
f_out = interp1d(heights, t_in, axis=1)

nheight_out = 50
new_heights = np.linspace(0, 1, nheight_out)
t_out = f_out(new_heights)

【讨论】:

非常感谢,但您的方法是否可能比我上面显示的方法使用更多的内存。在测试它时,我注意到你的方法比我的方法快得多,直到数组大小超过一定水平(在我的情况下为 90x50x181x360),突然你的方法变得比我的慢得多。 interp1d 可能会插入一个“数据点”的线性序列,每个数据点都是一个完整的 3D 数组。所以,如果这些数组变大,你就会开始点击交换文件。你可以通过对整个数组的块运行它来避免这种情况,或者可以找到一些可以为你做这件事的高级库。 interp1d 需要大量内存,并且无法推断 @dashesy:“需要大量内存” - OP 担心速度。 “不推断” - 这就是为什么它被称为“interp”。也就是说,如果您有更好的 numpy/scipy 函数解决方案,请务必发布,我会第一个支持它。 @fjarri 没有更好的解决方案,只是想让未来的读者了解这些警告;特别是内存消耗非常高,可以在一维切片上替换为InterpolatedUnivariateSpline【参考方案2】:

我一直在寻找一个适用于不规则间隔坐标的类似函数,并最终编写了我自己的函数。据我所知,插值处理得很好,内存和速度方面的性能也相当不错。我想我会在这里分享它,以防其他人在寻找类似功能时遇到这个问题:

import numpy as np
import warnings

def interp_along_axis(y, x, newx, axis, inverse=False, method='linear'):
    """ Interpolate vertical profiles, e.g. of atmospheric variables
    using vectorized numpy operations

    This function assumes that the x-xoordinate increases monotonically

    ps:
    * Updated to work with irregularly spaced x-coordinate.
    * Updated to work with irregularly spaced newx-coordinate
    * Updated to easily inverse the direction of the x-coordinate
    * Updated to fill with nans outside extrapolation range
    * Updated to include a linear interpolation method as well
        (it was initially written for a cubic function)

    Peter Kalverla
    March 2018

    --------------------
    More info:
    Algorithm from: http://www.paulinternet.nl/?page=bicubic
    It approximates y = f(x) = ax^3 + bx^2 + cx + d
    where y may be an ndarray input vector
    Returns f(newx)

    The algorithm uses the derivative f'(x) = 3ax^2 + 2bx + c
    and uses the fact that:
    f(0) = d
    f(1) = a + b + c + d
    f'(0) = c
    f'(1) = 3a + 2b + c

    Rewriting this yields expressions for a, b, c, d:
    a = 2f(0) - 2f(1) + f'(0) + f'(1)
    b = -3f(0) + 3f(1) - 2f'(0) - f'(1)
    c = f'(0)
    d = f(0)

    These can be evaluated at two neighbouring points in x and
    as such constitute the piecewise cubic interpolator.
    """

    # View of x and y with axis as first dimension
    if inverse:
        _x = np.moveaxis(x, axis, 0)[::-1, ...]
        _y = np.moveaxis(y, axis, 0)[::-1, ...]
        _newx = np.moveaxis(newx, axis, 0)[::-1, ...]
    else:
        _y = np.moveaxis(y, axis, 0)
        _x = np.moveaxis(x, axis, 0)
        _newx = np.moveaxis(newx, axis, 0)

    # Sanity checks
    if np.any(_newx[0] < _x[0]) or np.any(_newx[-1] > _x[-1]):
        # raise ValueError('This function cannot extrapolate')
        warnings.warn("Some values are outside the interpolation range. "
                      "These will be filled with NaN")
    if np.any(np.diff(_x, axis=0) < 0):
        raise ValueError('x should increase monotonically')
    if np.any(np.diff(_newx, axis=0) < 0):
        raise ValueError('newx should increase monotonically')

    # Cubic interpolation needs the gradient of y in addition to its values
    if method == 'cubic':
        # For now, simply use a numpy function to get the derivatives
        # This produces the largest memory overhead of the function and
        # could alternatively be done in passing.
        ydx = np.gradient(_y, axis=0, edge_order=2)

    # This will later be concatenated with a dynamic '0th' index
    ind = [i for i in np.indices(_y.shape[1:])]

    # Allocate the output array
    original_dims = _y.shape
    newdims = list(original_dims)
    newdims[0] = len(_newx)
    newy = np.zeros(newdims)

    # set initial bounds
    i_lower = np.zeros(_x.shape[1:], dtype=int)
    i_upper = np.ones(_x.shape[1:], dtype=int)
    x_lower = _x[0, ...]
    x_upper = _x[1, ...]

    for i, xi in enumerate(_newx):
        # Start at the 'bottom' of the array and work upwards
        # This only works if x and newx increase monotonically

        # Update bounds where necessary and possible
        needs_update = (xi > x_upper) & (i_upper+1<len(_x))
        # print x_upper.max(), np.any(needs_update)
        while np.any(needs_update):
            i_lower = np.where(needs_update, i_lower+1, i_lower)
            i_upper = i_lower + 1
            x_lower = _x[[i_lower]+ind]
            x_upper = _x[[i_upper]+ind]

            # Check again
            needs_update = (xi > x_upper) & (i_upper+1<len(_x))

        # Express the position of xi relative to its neighbours
        xj = (xi-x_lower)/(x_upper - x_lower)

        # Determine where there is a valid interpolation range
        within_bounds = (_x[0, ...] < xi) & (xi < _x[-1, ...])

        if method == 'linear':
            f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
            a = f1 - f0
            b = f0

            newy[i, ...] = np.where(within_bounds, a*xj+b, np.nan)

        elif method=='cubic':
            f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
            df0, df1 = ydx[[i_lower]+ind], ydx[[i_upper]+ind]

            a = 2*f0 - 2*f1 + df0 + df1
            b = -3*f0 + 3*f1 - 2*df0 - df1
            c = df0
            d = f0

            newy[i, ...] = np.where(within_bounds, a*xj**3 + b*xj**2 + c*xj + d, np.nan)

        else:
            raise ValueError("invalid interpolation method"
                             "(choose 'linear' or 'cubic')")

    if inverse:
        newy = newy[::-1, ...]

    return np.moveaxis(newy, 0, axis)

这是一个测试它的小例子:

import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d as scipy1d

# toy coordinates and data
nx, ny, nz = 25, 30, 10
x = np.arange(nx)
y = np.arange(ny)
z = np.tile(np.arange(nz), (nx,ny,1)) + np.random.randn(nx, ny, nz)*.1
testdata = np.random.randn(nx,ny,nz) # x,y,z

# Desired z-coordinates (must be between bounds of z)
znew = np.tile(np.linspace(2,nz-2,50), (nx,ny,1)) + np.random.randn(nx, ny, 50)*0.01

# Inverse the coordinates for testing
z = z[..., ::-1]
znew = znew[..., ::-1]

# Now use own routine
ynew = interp_along_axis(testdata, z, znew, axis=2, inverse=True)

# Check some random profiles
for i in range(5):
    randx = np.random.randint(nx)
    randy = np.random.randint(ny)

    checkfunc = scipy1d(z[randx, randy], testdata[randx,randy], kind='cubic')
    checkdata = checkfunc(znew)

    fig, ax = plt.subplots()
    ax.plot(testdata[randx, randy], z[randx, randy], 'x', label='original data')
    ax.plot(checkdata[randx, randy], znew[randx, randy], label='scipy')
    ax.plot(ynew[randx, randy], znew[randx, randy], '--', label='Peter')
    ax.legend()
    plt.show()

【讨论】:

很好!!很有用!谢谢!【参考方案3】:

按照 numpy.interp 的标准,可以将左/右边界分配给在within_bounds = ... 之后添加此行的范围之外的点

out_lbound = (xi <= _x[0,...])
out_rbound = (_x[-1,...] <= xi)

newy[i, out_lbound] = _y[0, out_lbound]
newy[i, out_rbound] = _y[-1, out_rbound]

newy[i, ...] = ... 之后。

如果我很好理解@Peter9192 使用的策略,我认为这些变化是一致的。我检查了一下,但可能是一些奇怪的情况无法正常工作。

【讨论】:

以上是关于沿轴插入 numpy.ndarray 的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

Numpy:ndarray数据类型和运算

获取二维 numpy ndarray 或 numpy 矩阵中前 N 个值的索引

创建没有固定第二维的3D numpy.ndarray

NumPy Ndarray对象

Numpy Ndarray 对象

Python机器学习(三十五)NumPy ndarray