Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时

Posted

技术标签:

【中文标题】Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时【英文标题】:Pyspark and error "TypeError: must be real number, not Column", when attempting to find compass bearing using a defined function over a window 【发布时间】:2020-10-13 17:59:37 【问题描述】:

我正在尝试在 pyspark 中的一个窗口上(使用 Databricks 的社区版)应用一个函数来查找两点之间的罗盘方位。我对火花很陌生,我确定我错过了一些东西。

我有一个看起来像这样的测试数据框(从类似问题中借用并添加了一个名为“bin”的列):

Timestamp,User,Latitude,Longitude,bin,BID
1462838468,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838512,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838389,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1462838497,49B4361512443A4DA,39.777982,-7.054599,A1,49B4361512443A4DA A1
1465975885,6E9E0581E2A032FD8,37.118362,-8.205041,A1,6E9E0581E2A032FD8 A1
1457723815,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1457897289,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457899229,405C238E25FE0B9E7,37.177922,-7.447443,A1,405C238E25FE0B9E7 A1
1457972626,405C238E25FE0B9E7,37.18059,-7.46128,A1,405C238E25FE0B9E7 A1
1458062553,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458241825,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458244457,405C238E25FE0B9E7,37.178172,-7.444512,A1,405C238E25FE0B9E7 A1
1458412513,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1458412292,405C238E25FE0B9E7,37.177322,-7.426781,A1,405C238E25FE0B9E7 A1
1465197963,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465202192,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923817,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923766,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923748,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2
1465923922,6E9E0581E2A032FD8,37.118362,-8.205041,B2,6E9E0581E2A032FD8 B2

这些是库:

from pyspark.sql.functions import acos, cos, sin, lit, toRadians

from pyspark.sql import functions as F

from pyspark.sql.window import Window

import math

from pyspark.sql.functions import concat, col, lit, lag, udf

from pyspark.sql.types import LongType

这是函数:

def direction_lookup(destination_x, origin_x, destination_y, origin_y):

deltaX = destination_x - origin_x

deltaY = destination_y - origin_y

degrees_temp = math.atan2(deltaX, deltaY)/math.pi*180

if degrees_temp < 0:

    degrees_final = 360 + degrees_temp

else:

    degrees_final = degrees_temp


return degrees_final`

我使用“User”和“bin”(我将其命名为“BID”)的串联列定义窗口,并按时间戳排序:

w = Window().partitionBy("BID").orderBy("Timestamp")

然后我尝试计算每个时间戳之间的方位角,如下所示:

bearing_df = df2.withColumn("bearing", bearing("Longitude", lag("Longitude", 1).over(w), "Latitude", lag("Latitude", 1).over(w)))

如果我只使用坐标,这个函数就可以工作,所以我在 spark 中应用它的方式有问题。我还成功地使用了相同的工作流程来计算点之间的距离(显然,使用不同的函数)。我尝试将它注册为 UDF,但我得到了同样的错误,“TypeError: must be real number, not Column”。我不确定接下来要尝试什么,而且我对 pyspark 很陌生。请帮忙!

【问题讨论】:

【参考方案1】:

数据框中的列属于列类型。当您传递值函数时,函数期望的是数字类型而不是列数据类型。

您可以在传递给函数之前转换值

from pyspark.sql.types import IntegerType
data_df = data_df.withColumn("samplecol", data_df["samplecol"].cast(IntegerType()))

【讨论】:

我曾怀疑过这一点,并在早些时候尝试过。刚刚再试了一次,我仍然收到与此代码相同的错误:bearing_df = df2.withColumn("bearing", bearing(df2["Longitude"].cast(DoubleType()), lag(df2["Longitude"].cast(DoubleType()), 1).over(w), df2["Latitude"].cast(DoubleType()), lag(df2["Latitude"].cast(DoubleType()), 1).over(w)))【参考方案2】:

从 csv 文件中读取示例数据后,我使用 inferSchema 来消除强制转换:

df = spark.read.option("header", True).option("inferSchema", True).csv(...)

df 现在有了架构

root
 |-- Timestamp: integer (nullable = true)
 |-- User: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- bin: string (nullable = true)
 |-- BID: string (nullable = true)

接下来可以定义udf

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
bearing = udf(direction_lookup, DoubleType())

最后可以使用问题中的行调用udf。


仅与问题间接相关的两个评论:

函数direction_lookup 的开头应该有一个空检查。组第一行的lag 函数返回None,否则计算将失败
def direction_lookup(destination_x, origin_x, destination_y, origin_y):
    if origin_x is None or origin_y is None:
        return None
    deltaX = destination_x - origin_x
    [...]
窗口规范还需要两行来定义分区 (w = Window().partitionBy("User","bin").orderBy("Timestamp")),因此 BID 列不是严格要求的

【讨论】:

谢谢,刚看到这个,我明天试试! 这成功了!!!我很高兴,非常感谢你花时间给我留下这么好的答案。我认为是空检查让它发挥作用。

以上是关于Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时的主要内容,如果未能解决你的问题,请参考以下文章

super() 为新型类引发“TypeError:必须是类型,而不是 classobj”

Azure Databricks 和 pyspark - 子字符串错误

pyspark.ml:计算精度和召回时的类型错误

For Loop - TypeError:必须是str,而不是int [duplicate]

Pyspark 和 Cassandra 连接错误

PySpark 安装错误