计算pyspark中每个起点到目的地的最小距离

Posted

技术标签:

【中文标题】计算pyspark中每个起点到目的地的最小距离【英文标题】:Calculate the minimum distance to destinations for each origin in pyspark 【发布时间】:2021-08-01 21:18:27 【问题描述】:

我有一个起点和终点的列表以及它们的地理坐标。我需要计算每个起点到目的地的最短距离。

下面是我的代码:

import pyspark.sql.functions as F
from haversine import haversine_vector, Unit

data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))]
columns1 = ["Origin", "Origin_Geo"]
df1 = spark.createDataFrame(data=data1, schema=columns1)
data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))]
columns2 = ["Destination", "Destination_Geo"]
df2 = spark.createDataFrame(data=data2, schema=columns2)

df = df1.crossJoin(df2)

df.withColumn(
    "Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()

我收到如下错误:

IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed

我的问题是:

    withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo'))) 似乎有问题。我不知道为什么。 (我是 pyspark 的新手..)

    我有一长串出发地和目的地(都超过 30K)。交叉连接会生成许多起点和终点的组合。我想知道是否有更有效的方法来获得最小距离?

非常感谢。

【问题讨论】:

你为什么用haversine_vector而不是haversine 【参考方案1】:

您正在将haversine 函数应用于应该应用于元组或数组的列。

如果你想使用这个库,你需要创建一个 UDF 并在你所有的 spark 节点上安装 hasrsine 包。

from haversine import haversine
from pyspark.sql import functions as F, types as T

haversine_udf = F.udf(haversine, T.FloatType())

df.withColumn(
    "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()

如果你不能在每个节点上安装包,那么你可以简单地使用函数的内置版本(参见Haversine Formula in Python (Bearing and Distance between two GPS points)) - 公式严重依赖于半径你选择的地球

from math import radians, cos, sin, asin, sqrt
from pyspark.sql import functions as F, types as T

@F.udf(T.FloatType())
def haversine_udf(point1, point2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1 = point1
    lon2, lat2 = point2
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6372.8  # Radius of earth in kilometers. Use 3956 for miles
    return c * r

df.withColumn(
    "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
+------+------------+                                                           
|Origin|Min_Distance|
+------+------------+
|     B|   351.08905|
|     A|   392.32755|
+------+------------+

【讨论】:

非常感谢!你对我的问题 #2 有什么想法吗? @zesla 交叉连接就是它们。不幸的是,我不知道有什么方法可以降低他们的成本。 @zesla在原包中,地球的半径是6371.0088

以上是关于计算pyspark中每个起点到目的地的最小距离的主要内容,如果未能解决你的问题,请参考以下文章

提高篇:图的最短路径算法和最小生成树算法

确定行进路径 mkmapview swift 上的距离

计算 shapefile 中每个多边形之间的最大/(或最小)距离

Pandas对Pyspark函数的迭代。

高德导航中怎么计算起点和终点之间行程的距离

POJ 3258 River Hopscotch (最大最小距离)二分