具有多个参数的 pyspark udf

Posted

技术标签:

【中文标题】具有多个参数的 pyspark udf【英文标题】:pyspark udf with multiple arguments 【发布时间】:2021-11-29 23:20:31 【问题描述】:

我正在使用 python 函数来计算给定经度和纬度的两点之间的距离。

def haversine(lon1, lat1, lon2, lat2):

    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    newlon = lon2 - lon1
    newlat = lat2 - lat1

    haver_formula = np.sin(newlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(newlon/2.0)**2

    dist = 2 * np.arcsin(np.sqrt(haver_formula))
    miles = 3958 * dist 
    return miles

我的数据框有 4 列 - lat、long、merch_lat、merch_long。

当我创建这样的 UDF 时,它会引发错误。我不知道我哪里错了。

udf_haversine = udf(haversine, FloatType())
data = data.withColumn("distance", udf_haversine("long", "lat", "merch_long","merch_lat"))

错误是:

An error occurred while calling o1499.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:

如何创建一个接受多列并返回单个值的udf?

【问题讨论】:

我认为像这样将多列传递到您的 UDF 中是有效的。您提供的错误消息非常笼统,可能由于多种原因而发生。你能分享更多关于它的细节吗? medium.com/@nikolasbielski/… 可能有帮助 【参考方案1】:

您可能在numpy.dtype 和序列化方面遇到问题。

由于milesnumpy.float64 类型,请尝试返回float(miles)

有效的完整示例:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import numpy as np


def haversine(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
    newlon = lon2 - lon1
    newlat = lat2 - lat1
    haver_formula = (
        np.sin(newlat / 2.0) ** 2
        + np.cos(lat1) * np.cos(lat2) * np.sin(newlon / 2.0) ** 2
    )
    dist = 2 * np.arcsin(np.sqrt(haver_formula))
    miles = 3958 * dist
    return float(miles)


spark = SparkSession.builder.getOrCreate()
data = [
    
        "long": 18.427238,
        "lat": 19.510083,
        "merch_long": 93.710735,
        "merch_lat": 52.182011,
    
]
df = spark.createDataFrame(data)
udf_haversine = F.udf(haversine, DoubleType())
df = df.withColumn("distance", udf_haversine("long", "lat", "merch_long", "merch_lat"))

【讨论】:

以上是关于具有多个参数的 pyspark udf的主要内容,如果未能解决你的问题,请参考以下文章

具有多个接收器的 pyspark 并行处理

具有多个聚合的 pyspark groupBy(如 pandas)

如何在pyspark中加入具有多个重叠的两个数据框

使用 pyspark/pandas 使用列名的模式匹配对具有不同列名的多个列值求和

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列