如何在 Spark SQL(DataFrame)的 UDF 中使用常量值

Posted

技术标签:

【中文标题】如何在 Spark SQL(DataFrame)的 UDF 中使用常量值【英文标题】:How to use constant value in UDF of Spark SQL(DataFrame) 【发布时间】:2015-04-02 07:01:56 【问题描述】:

我有一个包含timestamp 的数据框。要按时间(分钟、小时或天)聚合,我尝试过:

val toSegment = udf((timestamp: String) => 
  val asLong = timestamp.toLong
  asLong - asLong % 3600000 // period = 1 hour
)

val df: DataFrame // the dataframe
df.groupBy(toSegment($"timestamp")).count()

这很好用。

我的问题是如何将 UDF toSegment 概括为

val toSegmentGeneralized = udf((timestamp: String, period: Int) => 
  val asLong = timestamp.toLong
  asLong - asLong % period
)

我尝试了以下方法,但它不起作用

df.groupBy(toSegment($"timestamp", $"3600000")).count()

似乎找到了名为3600000的列。

可能的解决方案是使用常量列,但我找不到。

【问题讨论】:

【参考方案1】:

您可以使用org.apache.spark.sql.functions.lit() 创建常量列:

import org.apache.spark.sql.functions._

df.groupBy(toSegment($"timestamp", lit(3600000))).count()

【讨论】:

如果你有一个字符串或整数要传入,lit 函数非常有用。像数组/列表这样的东西会惨遭失败。关于在那里做什么有什么想法吗? 那个包还有一个叫做array()的函数,你可以用它来组合一堆文字列——我还没试过。为列表创建一个类似的函数可能并不难,特别是如果您查看functions.scala 中的array() 的实现——似乎并不存在。 现在已经尝试使用array(),我应该指出,对于某些T,对应的UDF参数需要是ArrayBuffer[T]类型。 Spark 1.5.0 注意: 传递 array() 现在似乎会导致 WrappedArray 被传递到 UDF。这意味着您可以使 UDF 参数类型类似于 SeqIndexedSeq @SpiroMichaylov 你知道如何传递地图吗?我无法将地图传递给 udf。 ***.com/questions/40598890/…

以上是关于如何在 Spark SQL(DataFrame)的 UDF 中使用常量值的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 如何将 DataFrame 写入文本文件?

如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?

如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框

如何将 BigQuery SQL 查询结果转换为 Spark DataFrame?

如何提高具有数组列的 DataFrame 的 Spark SQL 查询性能?

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF