AnalysisException callUDF() inside withColumn()

Posted

技术标签:

【中文标题】AnalysisException callUDF() inside withColumn()【英文标题】: 【发布时间】:2018-05-15 08:13:39 【问题描述】:

今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了一个很奇怪的问题。

我有一个 UDF(),计算 2 点之间的距离

private static UDF4<Double, Double, Double, Double, Double> calcDistance =
        (UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
                GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);

UDF 的注册

spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);

我有一个如下结构的数据框(这个 DF 是 hpan 字段连接 2 个 DF 的结果)

root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)

所以我想要添加一个距离在 (lat,lon) 和 (mean_lat,mean_lon) 之间的列;

svWithCoordsTzAndDistancesDF.withColumn("distance",
    callUDF("calcDistance",col("latitude"), col("longitude"), 
            col("mean_lat"), col("mean_lon")));

它在 Spark 2.2 上运行良好,但在 v2.3 上开始失败 例外是

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 已解决的属性 'mean_lon,'mean_lat,'longitude,'latitude 缺失 来自 gmt_msk_offset#147,utime#3,longitude#146,地址 ss_city#141,udate#29,mean_lon#371,weekDay#230,reqamt#4L,latitude#145,mean_lat#369,location_type#243,hpan#1,utimeWithTZ#218,mcc_code#14,atmid#9 在操作员'项目 [hpan#1, atmid#9, reqamt# 4L, mcc_code#14, utime#3, udate#29, address_city#141, latitude#145, 经度#146,gmt_msk_offset#147,utimeWithTZ#218,weekDay#230, location_type#243, mean_lat#369, mean_lon#371, 'calcDist ance('纬度,'经度,'mean_lat,'mean_lon)AS距离#509]。 具有相同名称的属性出现在操作中: mean_lon,mean_lat,经度,纬度。请检查是否正确 使用了属性。;;

我尝试像这样在 UDF() 中的 cols 中添加别名

svWithCoordsTzAndDistancesDF.withColumn("distance",
        callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));

或者用scala序列包装这个cols

svWithCoordsTzAndDistancesDF.withColumn("distance",
        callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
                (col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
                .asScala()
                .toSeq()));

这些尝试都不能解决问题。

也许有人知道这个问题的解决方法?

转换流程是这样的

ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2). 

我认为问题可能出在为此流程构建的执行计划中......

【问题讨论】:

【参考方案1】:

这是某种魔法。当我指定列的数据框并添加 select("*") - 它可以工作。如果有人能解释一下 - 我会非常感激

df = df.select("*")
       .withColumn("distance", callUDF("calcDistance",
                df.col("latitude"),
                df.col("longitude"),
                df.col("mean_lat"),
                df.col("mean_lon")))
      .toDF();

【讨论】:

以上是关于AnalysisException callUDF() inside withColumn()的主要内容,如果未能解决你的问题,请参考以下文章

使用 callUDF 创建链接 UDF 调用的方法

使用 withColumn 和 callUDF 将列附加到数据框

如何正确处理 spark.sql.AnalysisException

AnalysisException:无法解析给定的输入列:

SQL 语句中的 Databricks 错误:AnalysisException:无法解析 '``' 给定的输入列:

PySpark AnalysisException:无法解析列名