使用 Spark DataFrame 的地理过滤器

Posted

技术标签:

【中文标题】使用 Spark DataFrame 的地理过滤器【英文标题】:Geo Filter with Spark DataFrame 【发布时间】:2015-08-15 08:44:11 【问题描述】:

我是使用 spark 的数据帧的新手,有时这很奇怪。 假设我有一个数据框,其中包含带有纬度和经度坐标的日志。

 LogsDataFrame.printSchema :
 root
 |-- lat: double (nullable = false)
 |-- lon: double (nullable = false)
 |-- imp: string (nullable = false)
 |-- log_date: string (nullable = true)
 |-- pubuid: string (nullable = true)

另一方面,我有一个简单的方法

within(lat : Double, long : Double, radius : Double) : Boolean

告诉纬度和经度是否在预定义位置的某个半径内。

现在,我如何过滤不满足的点 Log。我试过了

logsDataFrame.filter(within(logsDF("lat"), logsDF("lon"), RADIUS)

但它不会推断出 Double 而是将 Column 作为类型返回。 我怎样才能得到这个工作? spark 站点中的文档有点简单,我确定我遗漏了一些东西。

感谢您的帮助。

【问题讨论】:

【参考方案1】:

一般来说,您至少需要两件事才能使其发挥作用。首先你必须创建一个UDF 包装within

import org.apache.spark.sql.functions.udf, lit

val withinUDF = udf(within _)

接下来,当调用UDF时,半径应该被标记为文字:

df.where(withinUDF($"lat", $"long", lit(RADIUS)))

由于并非每种类型都可以通过这种方式传递,因此创建包装器和调用 lit 相当乏味,您可能更喜欢柯里化:

def within(radius: Double) = udf((lat: Double, long: Double) => ???)

df.where(within(RADIUS)($"lat", $"long"))

【讨论】:

这太棒了,效果很好。我错过了文字部分。我肯定会用柯里化重写代码。刚开始写Scala。谢谢。

以上是关于使用 Spark DataFrame 的地理过滤器的主要内容,如果未能解决你的问题,请参考以下文章

Spark Dataframe 中的过滤操作

在新列上过滤 Spark DataFrame

从 Spark 中的 DataFrame 中过滤和选择数据

Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行

Spark SQL Dataframe API - 动态构建过滤条件

在按天分区的数据上过滤 n 天窗口的 spark DataFrame