数百万个 3D 点:如何找到最接近给定点的 10 个?

Posted

技术标签:

【中文标题】数百万个 3D 点:如何找到最接近给定点的 10 个?【英文标题】:Millions of 3D points: How to find the 10 of them closest to a given point? 【发布时间】:2011-01-29 23:24:42 【问题描述】:

3-d 中的点由 (x,y,z) 定义。任意两点 (X,Y,Z) 和 (x,y,z) 之间的距离 d 为 d= Sqrt[(X-x)^2 + (Y-y)^2 + (Z-z)^2]。 现在一个文件中有一百万个条目,每个条目都是空间中的某个点,没有特定的顺序。给定任意点 (a,b,c) 找到离它最近的 10 个点。您将如何存储这百万个点以及如何从该数据结构中检索这 10 个点。

【问题讨论】:

您是在被告知要点 (a,b,c) 是什么之前还是之后创建和填充数据结构?例如,如果您先创建数据结构,然后用户键入 (a,b,c) 并​​希望立即得到答案,则 David 的答案不起作用。 好点(没有双关语!)当然,如果事先不知道(a,b,c),则更多的是优化现有点列表以按3D位置搜索的问题,而不是实际进行搜索。 真正应该澄清的是,是否需要考虑准备数据结构和在该数据结构中存储百万点的成本,或者只考虑检索性能。如果该成本无关紧要,那么无论您检索多少次,kd-tree 都将获胜。如果该成本确实很重要,那么您还应该指定您希望运行搜索的次数(对于少量搜索,蛮力将获胜,对于较大的 kd 将获胜)。 【参考方案1】:

百万分是个小数目。最直接的方法在这里有效(基于 KDTree 的代码速度较慢(仅查询一个点)。

蛮力方法(时间 ~1 秒)

#!/usr/bin/env python
import numpy

NDIM = 3 # number of dimensions

# read points into array
a = numpy.fromfile('million_3D_points.txt', sep=' ')
a.shape = a.size / NDIM, NDIM

point = numpy.random.uniform(0, 100, NDIM) # choose random point
print 'point:', point
d = ((a-point)**2).sum(axis=1)  # compute distances
ndx = d.argsort() # indirect sort 

# print 10 nearest points to the chosen one
import pprint
pprint.pprint(zip(a[ndx[:10]], d[ndx[:10]]))

运行它:

$ time python nearest.py 
point: [ 69.06310224   2.23409409  50.41979143]
[(array([ 69.,   2.,  50.]), 0.23500677815852947),
 (array([ 69.,   2.,  51.]), 0.39542392750839772),
 (array([ 69.,   3.,  50.]), 0.76681859086988302),
 (array([ 69.,   3.,  50.]), 0.76681859086988302),
 (array([ 69.,   3.,  51.]), 0.9272357402197513),
 (array([ 70.,   2.,  50.]), 1.1088022980015722),
 (array([ 70.,   2.,  51.]), 1.2692194473514404),
 (array([ 70.,   2.,  51.]), 1.2692194473514404),
 (array([ 70.,   3.,  51.]), 1.801031260062794),
 (array([ 69.,   1.,  51.]), 1.8636121147970444)]

real    0m1.122s
user    0m1.010s
sys 0m0.120s

这是生成百万个 3D 点的脚本:

#!/usr/bin/env python
import random
for _ in xrange(10**6):
    print ' '.join(str(random.randrange(100)) for _ in range(3))

输出:

$ head million_3D_points.txt

18 56 26
19 35 74
47 43 71
82 63 28
43 82 0
34 40 16
75 85 69
88 58 3
0 63 90
81 78 98

您可以使用该代码来测试更复杂的数据结构和算法(例如,它们是否真的消耗更少的内存或比上述最简单的方法更快)。值得注意的是,目前它是唯一包含工作代码的答案。

基于KDTree的解决方案(时间~1.4秒)

#!/usr/bin/env python
import numpy

NDIM = 3 # number of dimensions

# read points into array
a = numpy.fromfile('million_3D_points.txt', sep=' ')
a.shape = a.size / NDIM, NDIM

point =  [ 69.06310224,   2.23409409,  50.41979143] # use the same point as above
print 'point:', point


from scipy.spatial import KDTree

# find 10 nearest points
tree = KDTree(a, leafsize=a.shape[0]+1)
distances, ndx = tree.query([point], k=10)

# print 10 nearest points to the chosen one
print a[ndx]

运行它:

$ time python nearest_kdtree.py  

point: [69.063102240000006, 2.2340940900000001, 50.419791429999997]
[[[ 69.   2.  50.]
  [ 69.   2.  51.]
  [ 69.   3.  50.]
  [ 69.   3.  50.]
  [ 69.   3.  51.]
  [ 70.   2.  50.]
  [ 70.   2.  51.]
  [ 70.   2.  51.]
  [ 70.   3.  51.]
  [ 69.   1.  51.]]]

real    0m1.359s
user    0m1.280s
sys 0m0.080s

C++ 中的部分排序(时间 ~1.1 秒)

// $ g++ nearest.cc && (time ./a.out < million_3D_points.txt )
#include <algorithm>
#include <iostream>
#include <vector>

#include <boost/lambda/lambda.hpp>  // _1
#include <boost/lambda/bind.hpp>    // bind()
#include <boost/tuple/tuple_io.hpp>

namespace 
  typedef double coord_t;
  typedef boost::tuple<coord_t,coord_t,coord_t> point_t;

  coord_t distance_sq(const point_t& a, const point_t& b)  // or boost::geometry::distance
    coord_t x = a.get<0>() - b.get<0>();
    coord_t y = a.get<1>() - b.get<1>();
    coord_t z = a.get<2>() - b.get<2>();
    return x*x + y*y + z*z;
  


int main() 
  using namespace std;
  using namespace boost::lambda; // _1, _2, bind()

  // read array from stdin
  vector<point_t> points;
  cin.exceptions(ios::badbit); // throw exception on bad input
  while(cin) 
    coord_t x,y,z;
    cin >> x >> y >> z;    
    points.push_back(boost::make_tuple(x,y,z));
  

  // use point value from previous examples
  point_t point(69.06310224, 2.23409409, 50.41979143);
  cout << "point: " << point << endl;  // 1.14s

  // find 10 nearest points using partial_sort() 
  // Complexity: O(N)*log(m) comparisons (O(N)*log(N) worst case for the implementation)
  const size_t m = 10;
  partial_sort(points.begin(), points.begin() + m, points.end(), 
               bind(less<coord_t>(), // compare by distance to the point
                    bind(distance_sq, _1, point), 
                    bind(distance_sq, _2, point)));
  for_each(points.begin(), points.begin() + m, cout << _1 << "\n"); // 1.16s

运行它:

g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )
point: (69.0631 2.23409 50.4198)
(69 2 50)
(69 2 51)
(69 3 50)
(69 3 50)
(69 3 51)
(70 2 50)
(70 2 51)
(70 2 51)
(70 3 51)
(69 1 51)

real    0m1.152s
user    0m1.140s
sys 0m0.010s

C++ 中的优先队列(时间 ~1.2 秒)

#include <algorithm>           // make_heap
#include <functional>          // binary_function<>
#include <iostream>

#include <boost/range.hpp>     // boost::begin(), boost::end()
#include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout <<

namespace 
  typedef double coord_t;
  typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t;

  // calculate distance (squared) between points `a` & `b`
  coord_t distance_sq(const point_t& a, const point_t& b)  
    // boost::geometry::distance() squared
    using std::tr1::get;
    coord_t x = get<0>(a) - get<0>(b);
    coord_t y = get<1>(a) - get<1>(b);
    coord_t z = get<2>(a) - get<2>(b);
    return x*x + y*y + z*z;
  

  // read from input stream `in` to the point `point_out`
  std::istream& getpoint(std::istream& in, point_t& point_out)     
    using std::tr1::get;
    return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out));
  

  // Adaptable binary predicate that defines whether the first
  // argument is nearer than the second one to given reference point
  template<class T>
  class less_distance : public std::binary_function<T, T, bool> 
    const T& point;
  public:
    less_distance(const T& reference_point) : point(reference_point) 

    bool operator () (const T& a, const T& b) const 
      return distance_sq(a, point) < distance_sq(b, point);
     
  ;


int main() 
  using namespace std;

  // use point value from previous examples
  point_t point(69.06310224, 2.23409409, 50.41979143);
  cout << "point: " << point << endl;

  const size_t nneighbours = 10; // number of nearest neighbours to find
  point_t points[nneighbours+1];

  // populate `points`
  for (size_t i = 0; getpoint(cin, points[i]) && i < nneighbours; ++i)
    ;

  less_distance<point_t> less_distance_point(point);
  make_heap  (boost::begin(points), boost::end(points), less_distance_point);

  // Complexity: O(N*log(m))
  while(getpoint(cin, points[nneighbours])) 
    // add points[-1] to the heap; O(log(m))
    push_heap(boost::begin(points), boost::end(points), less_distance_point); 
    // remove (move to last position) the most distant from the
    // `point` point; O(log(m))
    pop_heap (boost::begin(points), boost::end(points), less_distance_point);
  

  // print results
  push_heap  (boost::begin(points), boost::end(points), less_distance_point);
  //   O(m*log(m))
  sort_heap  (boost::begin(points), boost::end(points), less_distance_point);
  for (size_t i = 0; i < nneighbours; ++i) 
    cout << points[i] << ' ' << distance_sq(points[i], point) << '\n';  
  

运行它:

$ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )

point: (69.0631 2.23409 50.4198)
(69 2 50) 0.235007
(69 2 51) 0.395424
(69 3 50) 0.766819
(69 3 50) 0.766819
(69 3 51) 0.927236
(70 2 50) 1.1088
(70 2 51) 1.26922
(70 2 51) 1.26922
(70 3 51) 1.80103
(69 1 51) 1.86361

real    0m1.174s
user    0m1.180s
sys 0m0.000s

基于线性搜索的方法(时间 ~1.15 秒)

// $ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )
#include <algorithm>           // sort
#include <functional>          // binary_function<>
#include <iostream>

#include <boost/foreach.hpp>
#include <boost/range.hpp>     // begin(), end()
#include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout <<

#define foreach BOOST_FOREACH

namespace 
  typedef double coord_t;
  typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t;

  // calculate distance (squared) between points `a` & `b`
  coord_t distance_sq(const point_t& a, const point_t& b);

  // read from input stream `in` to the point `point_out`
  std::istream& getpoint(std::istream& in, point_t& point_out);    

  // Adaptable binary predicate that defines whether the first
  // argument is nearer than the second one to given reference point
  class less_distance : public std::binary_function<point_t, point_t, bool> 
    const point_t& point;
  public:
    explicit less_distance(const point_t& reference_point) 
        : point(reference_point) 
    bool operator () (const point_t& a, const point_t& b) const 
      return distance_sq(a, point) < distance_sq(b, point);
     
  ;


int main() 
  using namespace std;

  // use point value from previous examples
  point_t point(69.06310224, 2.23409409, 50.41979143);
  cout << "point: " << point << endl;
  less_distance nearer(point);

  const size_t nneighbours = 10; // number of nearest neighbours to find
  point_t points[nneighbours];

  // populate `points`
  foreach (point_t& p, points)
    if (! getpoint(cin, p))
      break;

  // Complexity: O(N*m)
  point_t current_point;
  while(cin) 
    getpoint(cin, current_point); //NOTE: `cin` fails after the last
                                  //point, so one can't lift it up to
                                  //the while condition

    // move to the last position the most distant from the
    // `point` point; O(m)
    foreach (point_t& p, points)
      if (nearer(current_point, p)) 
        // found point that is nearer to the `point` 

        //NOTE: could use insert (on sorted sequence) & break instead
        //of swap but in that case it might be better to use
        //heap-based algorithm altogether
        std::swap(current_point, p);
  

  // print results;  O(m*log(m))
  sort(boost::begin(points), boost::end(points), nearer);
  foreach (point_t p, points)
    cout << p << ' ' << distance_sq(p, point) << '\n';  


namespace 
  coord_t distance_sq(const point_t& a, const point_t& b)  
    // boost::geometry::distance() squared
    using std::tr1::get;
    coord_t x = get<0>(a) - get<0>(b);
    coord_t y = get<1>(a) - get<1>(b);
    coord_t z = get<2>(a) - get<2>(b);
    return x*x + y*y + z*z;
  

  std::istream& getpoint(std::istream& in, point_t& point_out)     
    using std::tr1::get;
    return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out));
  

测量结果表明,大部分时间都花在从文件中读取数组上,实际计算所需的时间要少几个数量级。

【讨论】:

写得真好。为了抵消文件读取,我已经在执行 100 次的搜索周围循环运行你的 python 实现(每次查看不同的点并只构建一次 kd-tree)。蛮力还是赢了。让我挠了挠头。但后来我检查了你的叶子大小,你有一个错误 - 你将叶子大小设置为 1000001,这不会很好。将 leafsize 设置为 10 后,kd 获胜(100 分从 35 秒到 70 秒,其中大部分 35 秒用于构建树,100 次检索 10 分需要一秒钟)。 因此,作为结论,如果您可以预先计算 kd-tree,它将在数量级上胜过蛮力(更不用说对于非常大的数据集,如果您有一棵树,您将不会必须读取内存中的所有数据)。 @goran:如果我将 Leafsize 设置为 10,那么查询一个点需要大约 10 秒(而不是 1 秒)。我同意如果任务是查询多个 (>10) 点,那么 kd-tree 应该会获胜。 @Unreason:上面基于优先队列和线性搜索的实现不会读取内存中的所有数据。【参考方案2】:

对于任意两点 P1 (x1, y1, z1) 和 P2 (x2, y2, z2),如果两点之间的距离为 d,则以下所有条件都必须为真:

|x1 - x2| <= d 
|y1 - y2| <= d
|z1 - z2| <= d

在迭代整个集合时保持最近的 10 个,但也要保持与第 10 个最近的距离。在计算您所看到的每个点的距离之前,使用这三个条件可以节省大量复杂性。

【讨论】:

【参考方案3】:

这个问题需要进一步定义。

1) 关于预先索引数据的算法的决定变化很大,这取决于您是否可以将整个数据保存在内存中。

使用 kdtrees 和 octrees,您将不必将数据保存在内存中,并且可以从这一事实中获得性能优势,这不仅是因为内存占用更少,而且仅仅是因为您不必读取整个文件。

使用 bruteforce,您必须读取整个文件并重新计算要搜索的每个新点的距离。

不过,这对你来说可能并不重要。

2) 另一个因素是您必须搜索一个点的次数。

正如 JF Sebastian 所说,有时暴力破解甚至在大型数据集上速度更快,但请注意他的基准测试衡量的是从磁盘读取整个数据集(一旦构建了 kd-tree 或 octree 并且写在某处)并且他们只测量一次搜索。

【讨论】:

【参考方案4】:

我认为这是一个棘手的问题,可以测试您是否尝试过分做事。

考虑上面人们已经给出的最简单的算法:保留一个包含 10 个迄今为止最好的候选者的表格,并逐个遍历所有点。如果您找到比迄今为止十个最佳点中的任何一个更近的点,请将其替换。有什么复杂性?好吧,我们必须查看文件中的每个点一次,计算它的距离(或实际距离的平方)并与第 10 个最近点进行比较。如果更好,请将其插入到 10-best-so-far 表中的适当位置。

那么复杂性是什么?我们看每个点一次,所以它是距离的 n 次计算和 n 次比较。如果点更好,我们需要将它插入到正确的位置,这需要更多的比较,但它是一个常数因子,因为最佳候选表的大小是常数 10。

我们最终得到了一个在线性时间内运行的算法,点数为 O(n)。

但现在考虑一下这种算法的下限是多少?如果输入数据中没有顺序,我们必须查看每个点,看看它是否不是最接近的点之一。据我所知,下限是 Omega(n),因此上述算法是最优的。

【讨论】:

好点!由于您必须逐个读取文件才能构建任何数据结构,因此正如您所说,您的最低 可能 是 O(n) 。只有当问题要求重复找到最接近的 10 个点时,其他任何事情才重要!我认为你解释得很好。【参考方案5】:

无需计算距离。只是距离的平方应该满足您的需求。我认为应该更快。换句话说,您可以跳过sqrt 位。

【讨论】:

【参考方案6】:

这个问题本质上是在测试您对space partitioning algorithms 的知识和/或直觉。我会说将数据存储在octree 中是您最好的选择。它通常用于处理此类问题的 3d 引擎(存储数百万个顶点、光线追踪、查找碰撞等)。在最坏的情况下(我相信),查找时间大约为log(n)

【讨论】:

【参考方案7】:

简单的算法:

将点存储为元组列表,然后扫描这些点,计算距离并保持“最近”列表。

更多创意:

将点分组为区域(例如由“0,0,0”到“50,50,50”或“0,0,0”到“-20,-20,-20”描述的立方体) ,因此您可以从目标点“索引”到它们。检查目标所在的立方体,只搜索该立方体中的点。如果该立方体中的点少于 10 个,则检查“相邻”立方体,依此类推。

进一步考虑,这不是一个很好的算法:如果您的目标点距离立方体壁的距离超过 10 个点,那么您也必须搜索相邻的立方体。

我会使用 kd-tree 方法并找到最近的,然后删除(或标记)该最近的节点,并重新搜索新的最近节点。冲洗并重复。

【讨论】:

【参考方案8】:

计算它们每个的距离,并在 O(n) 时间内执行 Select(1..10, n)。我猜这将是天真的算法。

【讨论】:

【参考方案9】:

您可以将这些点存储在k-dimensional tree(kd-tree)中。 Kd 树针对最近邻搜索进行了优化(找到最接近给定点的 n 个点)。

【讨论】:

我认为这里需要一个八叉树。 构建 K-d 树所需的复杂性将高于对 10 个壁橱点进行线性搜索所需的复杂性。当您要对一个点集进行许多查询时,k-d 树的真正威力就出现了。 kd-tree 在现实生活中可能比蛮力方法要慢 ***.com/questions/2486093/… 这是我在采访中给出的答案。面试官使用不太精确的语言并不少见,而字里行间的阅读似乎最有可能是他们正在寻找的。事实上,如果我是面试官,并且有人给出了答案“我会以任何旧顺序存储分数,并进行线性扫描以找到 10 分”并根据我不精确的措辞来证明这个答案是正确的,我会很不满意. @Jason Orendorff:我肯定会在技术面试中讨论使用 kd-tree 来解决这样的问题;但是,我还要解释为什么对于给定的特定问题,更简单的线性搜索方法不仅会渐近更快,而且运行速度也会更快。这将显示对算法复杂性、数据结构知识以及考虑问题不同解决方案的能力的更深入理解。【参考方案10】:

基本上是我上面前两个答案的组合。由于这些点在文件中,因此无需将它们保存在内存中。我会使用最大堆而不是数组或最小堆,因为您只想检查小于第 10 个最近点的距离。对于数组,您需要将每个新计算的距离与您保留的所有 10 个距离进行比较。对于最小堆,您必须对每个新计算的距离执行 3 次比较。使用最大堆,当新计算的距离大于根节点时,您只执行 1 次比较。

【讨论】:

【参考方案11】:

如果文件中已经有数百万个条目,则无需将它们全部加载到内存中的数据结构中。只需保留一个包含迄今为止找到的前十个点的数组,然后扫描超过一百万个点,随时更新您的前十名列表。

这是 O(n) 的点数。

【讨论】:

这会很好用,但数组不是最有效的数据存储,因为您必须在每一步都检查它,或者保持排序,这可能很麻烦。大卫关于最小堆的回答为你做了这些事情,但在其他方面是相同的解决方案。当用户只想要10分时,这些顾虑可以忽略不计,但当用户突然想要最接近的100分时,你就麻烦了。 @Karl:问题指定10分。我认为包括这个细节是提问者故意的。因此,Will 描述了一个非常好的解决方案。 @Karl,编译器和 CPU 在优化最愚蠢的循环以击败最聪明的算法方面的表现常常令人惊讶。当循环可以在片上 ram 中运行时,永远不要低估所获得的加速。 文件中还没有百万个条目 - 您可以选择如何将它们存储在文件中。 :) 这种关于如何存储它的选择意味着您还可以预先计算任何随附的索引结构。 Kd-tree 将获胜,因为它根本不需要读取整个文件 我已经发布了你的答案***.com/questions/2486093/… 的实现(虽然我使用堆而不是线性搜索,这对于任务来说完全没有必要)【参考方案12】:

这不是家庭作业,是吗? ;-)

我的想法:遍历所有点并将它们放入最小堆或有界优先级队列中,以与目标的距离为键。

【讨论】:

当然,但不知道目标是什么。 :)

以上是关于数百万个 3D 点:如何找到最接近给定点的 10 个?的主要内容,如果未能解决你的问题,请参考以下文章

寻找附近点的算法?

快速方法按距离搜索数百万个坐标

找到最接近一组圆的点

最接近 3D 空间中多条线的 3d 点

如何使用纯 Redis 原子地删除数百万个匹配模式的键?

比较数百万个 mongoDB 记录中的变化的最佳方法