AnalysisException callUDF() inside withColumn()
Posted
技术标签:
【中文标题】AnalysisException callUDF() inside withColumn()【英文标题】: 【发布时间】:2018-05-15 08:13:39 【问题描述】:今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了一个很奇怪的问题。
我有一个 UDF(),计算 2 点之间的距离
private static UDF4<Double, Double, Double, Double, Double> calcDistance =
(UDF4<Double, Double, Double, Double, Double>) (lat, lon, meanLat, meanLon) ->
GeoUtils.calculateDistance(lat, lon, meanLat, meanLon);
UDF 的注册
spark.udf().register("calcDistance", calcDistance, DataTypes.DoubleType);
我有一个如下结构的数据框(这个 DF 是 hpan
字段连接 2 个 DF 的结果)
root
|-- hpan: string (nullable = true)
|-- atmid: string (nullable = true)
|-- reqamt: long (nullable = true)
|-- mcc_code: string (nullable = true)
|-- utime: string (nullable = true)
|-- udate: string (nullable = true)
|-- address_city: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- gmt_msk_offset: integer (nullable = true)
|-- utimeWithTZ: timestamp (nullable = true)
|-- weekDay: integer (nullable = true)
|-- location_type: string (nullable = true)
|-- mean_lat: double (nullable = true)
|-- mean_lon: double (nullable = true)
所以我想要添加一个距离在 (lat,lon) 和 (mean_lat,mean_lon) 之间的列;
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude"), col("longitude"),
col("mean_lat"), col("mean_lon")));
它在 Spark 2.2 上运行良好,但在 v2.3 上开始失败 例外是
线程“主”org.apache.spark.sql.AnalysisException 中的异常: 已解决的属性 'mean_lon,'mean_lat,'longitude,'latitude 缺失 来自 gmt_msk_offset#147,utime#3,longitude#146,地址 ss_city#141,udate#29,mean_lon#371,weekDay#230,reqamt#4L,latitude#145,mean_lat#369,location_type#243,hpan#1,utimeWithTZ#218,mcc_code#14,atmid#9 在操作员'项目 [hpan#1, atmid#9, reqamt# 4L, mcc_code#14, utime#3, udate#29, address_city#141, latitude#145, 经度#146,gmt_msk_offset#147,utimeWithTZ#218,weekDay#230, location_type#243, mean_lat#369, mean_lon#371, 'calcDist ance('纬度,'经度,'mean_lat,'mean_lon)AS距离#509]。 具有相同名称的属性出现在操作中: mean_lon,mean_lat,经度,纬度。请检查是否正确 使用了属性。;;
我尝试像这样在 UDF() 中的 cols 中添加别名
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",col("latitude").as("a"), col("longitude").as("b"), col("mean_lat").as("c"), col("mean_lon").as("d")));
或者用scala序列包装这个cols
svWithCoordsTzAndDistancesDF.withColumn("distance",
callUDF("calcDistance",JavaConverters.collectionAsScalaIterableConverter(Arrays.asList
(col("latitude"), col("longitude"), col("mean_lat"), col("mean_lon")))
.asScala()
.toSeq()));
这些尝试都不能解决问题。
也许有人知道这个问题的解决方法?
转换流程是这样的
ParentDF -> childDF1(as parentDF.groupBy().agg(mean())), childDF2(parentDF.filter('condition')) -> svWithCoordsTzAndDistancesDF (join childDF1 and childDF2).
我认为问题可能出在为此流程构建的执行计划中......
【问题讨论】:
【参考方案1】:这是某种魔法。当我指定列的数据框并添加 select("*")
- 它可以工作。如果有人能解释一下 - 我会非常感激
df = df.select("*")
.withColumn("distance", callUDF("calcDistance",
df.col("latitude"),
df.col("longitude"),
df.col("mean_lat"),
df.col("mean_lon")))
.toDF();
【讨论】:
以上是关于AnalysisException callUDF() inside withColumn()的主要内容,如果未能解决你的问题,请参考以下文章
使用 withColumn 和 callUDF 将列附加到数据框
如何正确处理 spark.sql.AnalysisException