Spark 数组结构和 UDF

Posted

技术标签:

【中文标题】Spark 数组结构和 UDF【英文标题】:Spark array struct and UDFs 【发布时间】:2018-01-02 19:02:44 【问题描述】:

我有以下架构,我想添加一个名为距离的新列。此列计算每行的两个时间序列之间的距离:time_series1 和 time_series2

|-- websites: struct (nullable = true)
|    |-- _1: integer (nullable = false)
|    |-- _2: integer (nullable = false)
|-- countryId1: integer (nullable = false)
|-- countryId2: integer (nullable = false)
|-- time_series1: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: float (nullable = false)
|    |    |-- _2: date (nullable = true)
|-- time_series2: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: float (nullable = false)
|    |    |-- _2: date (nullable = true)

所以我使用 udf 函数将这个新列定义为:

val step2= step1
  .withColumn("distance",  distanceUDF(col("time_series1"),col("time_series2")))
  .select("websites","countryId1","countryId2","time_series1","time_series2","distance")

和 UDF:

 val distanceUDF  = udf( (ts1:Seq[(Float,_)], ts2:Seq[(Float,_)])=>
                            compute_distance( ts1.map(_._1) , ts2.map(_._1)))

但我在映射上有问题,我不知道如何将数组 (struct (float,date).to 映射到 scala。

Seq[(Float,Date)] 是否等同于 array( struct (float,date)) ? 我有以下例外:

java.lang.ClassCastException: .GenericRowWithSchema cannot be cast to scala.Tuple2

我的问题与这里暴露的 Spark Sql UDF with complex input parameter 不同。我有一个带日期的有序时间序列(我有一个数组,而不仅仅是一个结构类型)

【问题讨论】:

【参考方案1】:

您添加的链接有您问题的答案

结构类型转换为o.a.s.sql.Row

所以你的函数应该有两个 Seq[Row] 参数。 然后你可以使用Row api 来获取浮点数。

在这种情况下,您可能需要使用Datasets。有关嵌套类型的更多信息,您可以观看The Joy of Nested Types

【讨论】:

以上是关于Spark 数组结构和 UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 udf 将空列添加到 Spark 中的复杂数组结构

使用带有结构序列的 Spark UDF

对 SparkSQL 中数组列的每个元素执行 UDF(需要另一个 spark 作业)

Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?

如何在 Spark 中创建有状态的 UDF?

结构化流是如何执行 pandas_udf 的?