Numpy / Scipy中的快速线性插值“沿路径”
Posted
技术标签:
【中文标题】Numpy / Scipy中的快速线性插值“沿路径”【英文标题】:Fast linear interpolation in Numpy / Scipy "along a path" 【发布时间】:2016-01-09 05:42:54 【问题描述】:假设我有来自山上 3 个(已知)高度的气象站的数据。具体来说,每个站点每分钟都会记录其所在位置的温度测量值。我有两种想要执行的插值。而且我希望能够快速执行每个操作。
所以让我们设置一些数据:
import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns
np.random.seed(0)
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
altitudes = np.array([500, 1500, 4000]).astype(float)
finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = ['Time'], ['Altitude']
finaltemps.plot()
太好了,所以我们的温度是这样的:
所有时间都插值到相同的高度:
我认为这个很简单。假设我想每次获得海拔 1,000 的温度。我可以使用内置的scipy
插值方法:
interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)
这很好用。让我们看看速度:
%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop
“沿路径”插值:
所以现在我有第二个相关的问题。假设我知道远足派对的高度是时间的函数,并且我想通过随时间线性插值我的数据来计算他们(移动)位置的温度。 特别是,我知道远足聚会地点的时间与我知道气象站温度的时间相同。我也可以做到这一点努力:
location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)
所以这非常有效,但重要的是要注意上面的关键行是使用列表推导来隐藏大量工作。在前面的案例中,scipy
正在为我们创建一个插值函数,并在大量数据上对其进行一次评估。在这种情况下,scipy
实际上是在构造N
单独的插值函数,并在少量数据上对每个插值函数进行一次评估。这感觉本质上是低效的。这里(在列表理解中)潜伏着一个 for 循环,而且,这感觉很松散。
毫不奇怪,这比前一种情况要慢得多:
%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop
所以第二个例子比第一个慢 1000 倍。 IE。与繁重的工作是“制作线性插值函数”步骤的想法一致……在第二个示例中发生了 1,000 次,但在第一个示例中仅发生了一次。
那么,问题是:有没有更好的方法来解决第二个问题?例如,有没有一种很好的方法来设置二维插值(也许可以处理这种情况哪里知道远足聚会地点的时间不是温度采样的时间)?或者有没有一种特别巧妙的方式来处理时间安排得当的事情?还是其他?
【问题讨论】:
现在这个是如何写一个问题的! 谢谢!现在你教我如何写一个杀手级的答案! :) 【参考方案1】:对于一个固定的时间点,可以利用以下插值函数:
g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])
其中a
是徒步旅行者的海拔高度,aa
是具有 3 次测量的向量 altitudes
和 cc
是具有系数的向量。需要注意三点:
-
对于对应于
aa
的给定温度 (alltemps
),可以通过使用 np.linalg.solve()
求解线性矩阵方程来确定 cc
。
g(a)
易于矢量化为 (N,) 维 a
和 (N, 3) 维 cc
(分别包括 np.linalg.solve()
)。
g(a)
被称为一阶单变量样条核(用于三个点)。使用abs(a-aa[i])**(2*d-1)
会将样条顺序更改为d
。这种方法可以解释为Gaussian Process in Machine Learning 的简化版本。
所以代码是:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
# generate temperatures
np.random.seed(0)
N, sigma = 1000, 5
trend = np.sin(4 / N * np.arange(N)) * 30
alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N)
for tmp0 in [70, 50, 40]])
# generate attitudes:
altitudes = np.array([500, 1500, 4000]).astype(float)
location = np.linspace(altitudes[0], altitudes[-1], N)
def doit():
""" do the interpolation, improved version for speed """
AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes])
# This is slighty faster than np.linalg.solve(), because AA is small:
cc = np.dot(np.linalg.inv(AA), alltemps)
return (cc[0]*np.abs(location-altitudes[0]) +
cc[1]*np.abs(location-altitudes[1]) +
cc[2]*np.abs(location-altitudes[2]))
t_loc = doit() # call interpolator
# do the plotting:
fg, ax = plt.subplots(num=1)
for alt, t in zip(altitudes, alltemps):
ax.plot(t, label="%d feet" % alt, alpha=.5)
ax.plot(t_loc, label="Interpolation")
ax.legend(loc="best", title="Altitude:")
ax.set_xlabel("Time")
ax.set_ylabel("Temperature")
fg.canvas.draw()
测量时间给出:
In [2]: %timeit doit()
10000 loops, best of 3: 107 µs per loop
更新:我替换了 doit()
中的原始列表推导式
导入速度提高 30% (For N=1000
)。
此外,根据要求比较,@moarningsun 在我的机器上的基准代码块:
10 loops, best of 3: 110 ms per loop
interp_checked
10000 loops, best of 3: 83.9 µs per loop
scipy_interpn
1000 loops, best of 3: 678 µs per loop
Output allclose:
[True, True, True]
请注意,N=1000
是一个相对较小的数字。使用N=100000
会产生结果:
interp_checked
100 loops, best of 3: 8.37 ms per loop
%timeit doit()
100 loops, best of 3: 5.31 ms per loop
这表明这种方法比interp_checked
方法更适合大型N
。
【讨论】:
这是一个非常新颖的解决方案。为了比较起见,您能否将您在同一台机器上针对所提出的其他替代方案显示类似执行的计时结果? @8one6 做了基准测试并删除了一些列表推导以提高速度。 使用return np.einsum('ij,ij->j', cc, np.abs(location - altitudes.reshape(-1,1)))
作为您的退货线路。您还应该矢量化您的 AA 构建,以防止形成如此多的中间体。
感谢您的提示。我尝试了您的 einsum()
行 - 有趣的是它更慢(6 毫秒,而 N=100000
为 4.25 毫秒)。不太清楚为什么。我尝试使用np.vectorize()
并没有成功生成可运行的代码。由于 AA
只有维度 (3,3),我不确定并行化速度的提升是否会取代调用 np.vectorize()
的开销。【参考方案2】:
两个值y1
、y2
在位置x1
和x2
之间相对于点xi
的线性插值很简单:
yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)
通过一些向量化的 Numpy 表达式,我们可以从数据集中选择相关点并应用上述函数:
I = np.searchsorted(altitudes, location)
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
问题是有些点位于已知范围的边界(甚至之外),应该考虑到这一点:
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
幸运的是,Scipy 已经提供了 ND 插值,这也很容易处理不匹配时间,例如:
from scipy.interpolate import interpn
time = np.arange(len(alltemps))
M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))
yI = interpn((time, altitudes), alltemps, xI)
这是一个基准代码(实际上没有任何pandas
,我确实包含了另一个答案中的解决方案):
import numpy as np
from scipy.interpolate import interp1d, interpn
def original():
return np.array([interp1d(altitudes, alltemps[i, :])(loc)
for i, loc in enumerate(location)])
def OP_self_answer():
return np.diagonal(interp1d(altitudes, alltemps)(location))
def interp_checked():
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
return yI
def scipy_interpn():
time = np.arange(len(alltemps))
xI = np.column_stack((time, location))
yI = interpn((time, altitudes), alltemps, xI)
return yI
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)
funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
print(func.func_name)
%timeit func()
from itertools import combinations
outs = [func() for func in funcs]
print('Output allclose:')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])
在我的系统上出现以下结果:
original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]
Scipy 的 interpn
与最快的方法相比在速度方面有所下降,但由于它的通用性和易用性,它绝对是要走的路。
【讨论】:
我希望这可以成为关于这种情况下最佳实践的公开对话。为此,您能否添加一些计时数据?特别是,您能否将我在我的问题(以及我提出的答案)中描述的方法与您在上面提出的方法一起计时,以便每个人都能看到相对速度? @8one6 - 你是对的,这绝对是不错的选择。您还希望以更通用的方式编写函数吗? IMO 的基本思想现在应该很明显了。 我觉得你写的很合理。令我惊讶的是,您的代码在original
和 OP_self_answer
之间仅显示了 2 倍的差异,而在我的机器上,相同的两个函数的执行时间似乎相差 10 倍。我想知道为什么。
@8one6 - 我不指望我的 8 年旧笔记本具有代表性 :) 这可能是因为 CPU 缓存小或 RAM 慢或其他原因;我重新进行了测试,结果还是一样。如果您愿意,您可以按自己的时间进行编辑,脚本应该可以按原样运行。【参考方案3】:
我将提供一点进展。在第二种情况下(“沿路径”插值),我们制作了许多不同的插值函数。我们可以尝试的一件事是只制作一个插值函数(在上述第一种情况下始终在高度维度上进行插值)并一遍又一遍地评估该函数(以矢量化方式)。这会给我们提供比我们想要的更多的数据(它会给我们一个 1,000 x 1,000 的矩阵,而不是一个 1,000 元素的向量)。但是我们的目标结果将只是沿着对角线。所以问题是,以更复杂的参数调用单个函数是否比创建多个函数并使用简单参数调用它们运行得更快?
答案是肯定的!
关键是scipy.interpolate.interp1d
返回的插值函数能够接受numpy.ndarray
作为其输入。因此,您可以通过输入向量输入以 C 速度多次有效地调用插值函数。 IE。这比编写一个在标量输入上一遍又一遍地调用插值函数的 for 循环要快得多。因此,虽然我们计算了许多最终丢弃的数据点,但我们无需构建许多我们几乎不使用的不同插值函数,从而节省了更多时间。
old_way = interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
# look ma, no for loops!
new_way = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
# note, `location` is a vector!
abs(old_way - new_way).max()
#-> 0.0
然而:
%%timeit
res = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
#-> 100 loops, best of 3: 16.7 ms per loop
因此,这种方法可以使我们提高 10 倍!谁能做得更好?或者建议一种完全不同的方法?
【讨论】:
以上是关于Numpy / Scipy中的快速线性插值“沿路径”的主要内容,如果未能解决你的问题,请参考以下文章
scipy 0.11.0 到 0.12.0 更改了线性 scipy.interpolate.interp1d,破坏了我不断更新的插值器