Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时
Posted
技术标签:
【中文标题】Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时【英文标题】:Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window 【发布时间】:2020-10-13 17:59:37 【问题描述】:我正在尝试在 pyspark 中的一个窗口上(使用 Databricks 的社区版)应用一个函数来查找两点之间的罗盘方位。我对火花很陌生,我确定我错过了一些东西。
我有一个看起来像这样的测试数据框(从类似问题中借用并添加了一个名为“bin”的列):
Timestamp,User,Latitude,Longitude,bin,BID
1462838468,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838512,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838389,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838497,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1465975885,6E9E0581E2A032FD8,37.118362,-8.205041,A1,6E9E0581E2A032FD8 A1
1457723815,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1457897289,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457899229,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457972626,405C238E25FE0B9E7,37.18059,-7.46128,A1,405C238E25FE0B9E7 A1
1458062553,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458241825,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458244457,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458412513,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458412292,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1465197963,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465202192,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923817,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923766,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923748,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923922,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
这些是库:
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import math
from pyspark.sql.functions import concat, col, lit, lag, udf
from pyspark.sql.types import LongType
这是函数:
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
deltaX = destination_x - origin_x
deltaY = destination_y - origin_y
degrees_temp = math.atan2(deltaX, deltaY)/math.pi*180
if degrees_temp < 0:
degrees_final = 360 + degrees_temp
else:
degrees_final = degrees_temp
return degrees_final`
我使用“User”和“bin”(我将其命名为“BID”)的串联列定义窗口,并按时间戳排序:
w = Window().partitionBy("BID").orderBy("Timestamp")
然后我尝试计算每个时间戳之间的方位角,如下所示:
bearing_df = df2.withColumn("bearing", bearing("Longitude", lag("Longitude", 1).over(w), "Latitude", lag("Latitude", 1).over(w)))
如果我只使用坐标,这个函数就可以工作,所以我在 spark 中应用它的方式有问题。我还成功地使用了相同的工作流程来计算点之间的距离(显然,使用不同的函数)。我尝试将它注册为 UDF,但我得到了同样的错误,“TypeError: must be real number, not Column”。我不确定接下来要尝试什么,而且我对 pyspark 很陌生。请帮忙!
【问题讨论】:
【参考方案1】:数据框中的列属于列类型。当您传递值函数时,函数期望的是数字类型而不是列数据类型。
您可以在传递给函数之前转换值
from pyspark.sql.types import IntegerType
data_df = data_df.withColumn("samplecol", data_df["samplecol"].cast(IntegerType()))
【讨论】:
我曾怀疑过这一点,并在早些时候尝试过。刚刚再试了一次,我仍然收到与此代码相同的错误:bearing_df = df2.withColumn("bearing", bearing(df2["Longitude"].cast(DoubleType()), lag(df2["Longitude"].cast(DoubleType()), 1).over(w), df2["Latitude"].cast(DoubleType()), lag(df2["Latitude"].cast(DoubleType()), 1).over(w)))
【参考方案2】:
从 csv 文件中读取示例数据后,我使用 inferSchema
来消除强制转换:
df = spark.read.option("header", True).option("inferSchema", True).csv(...)
df
现在有了架构
root
|-- Timestamp: integer (nullable = true)
|-- User: string (nullable = true)
|-- Latitude: double (nullable = true)
|-- Longitude: double (nullable = true)
|-- bin: string (nullable = true)
|-- BID: string (nullable = true)
接下来可以定义udf
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
bearing = udf(direction_lookup, DoubleType())
最后可以使用问题中的行调用udf。
仅与问题间接相关的两个评论:
函数direction_lookup
的开头应该有一个空检查。组第一行的lag
函数返回None
,否则计算将失败
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
if origin_x is None or origin_y is None:
return None
deltaX = destination_x - origin_x
[...]
窗口规范还需要两行来定义分区 (w = Window().partitionBy("User","bin").orderBy("Timestamp")
),因此 BID
列不是严格要求的
【讨论】:
谢谢,刚看到这个,我明天试试! 这成功了!!!我很高兴,非常感谢你花时间给我留下这么好的答案。我认为是空检查让它发挥作用。以上是关于Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时的主要内容,如果未能解决你的问题,请参考以下文章
super() 为新型类引发“TypeError:必须是类型,而不是 classobj”
Azure Databricks 和 pyspark - 子字符串错误