如何在 Spark SQL(DataFrame)的 UDF 中使用常量值
Posted
技术标签:
【中文标题】如何在 Spark SQL(DataFrame)的 UDF 中使用常量值【英文标题】:How to use constant value in UDF of Spark SQL(DataFrame) 【发布时间】:2015-04-02 07:01:56 【问题描述】:我有一个包含timestamp
的数据框。要按时间(分钟、小时或天)聚合,我尝试过:
val toSegment = udf((timestamp: String) =>
val asLong = timestamp.toLong
asLong - asLong % 3600000 // period = 1 hour
)
val df: DataFrame // the dataframe
df.groupBy(toSegment($"timestamp")).count()
这很好用。
我的问题是如何将 UDF toSegment
概括为
val toSegmentGeneralized = udf((timestamp: String, period: Int) =>
val asLong = timestamp.toLong
asLong - asLong % period
)
我尝试了以下方法,但它不起作用
df.groupBy(toSegment($"timestamp", $"3600000")).count()
似乎找到了名为3600000
的列。
可能的解决方案是使用常量列,但我找不到。
【问题讨论】:
【参考方案1】:您可以使用org.apache.spark.sql.functions.lit()
创建常量列:
import org.apache.spark.sql.functions._
df.groupBy(toSegment($"timestamp", lit(3600000))).count()
【讨论】:
如果你有一个字符串或整数要传入,lit 函数非常有用。像数组/列表这样的东西会惨遭失败。关于在那里做什么有什么想法吗? 那个包还有一个叫做array()
的函数,你可以用它来组合一堆文字列——我还没试过。为列表创建一个类似的函数可能并不难,特别是如果您查看functions.scala 中的array()
的实现——似乎并不存在。
现在已经尝试使用array()
,我应该指出,对于某些T
,对应的UDF参数需要是ArrayBuffer[T]
类型。
Spark 1.5.0 注意: 传递 array()
现在似乎会导致 WrappedArray
被传递到 UDF。这意味着您可以使 UDF 参数类型类似于 Seq
或 IndexedSeq
。
@SpiroMichaylov 你知道如何传递地图吗?我无法将地图传递给 udf。 ***.com/questions/40598890/…以上是关于如何在 Spark SQL(DataFrame)的 UDF 中使用常量值的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL - 如何将 DataFrame 写入文本文件?
如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?
如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框
如何将 BigQuery SQL 查询结果转换为 Spark DataFrame?