Scala 和 Spark UDF 函数

Posted

技术标签:

【中文标题】Scala 和 Spark UDF 函数【英文标题】:Scala and Spark UDF function 【发布时间】:2016-12-02 15:50:09 【问题描述】:

我制作了一个简单的 UDF 来转换或从 spark 中 temptabl 中的时间字段中提取一些值。我注册了该函数,但是当我使用 sql 调用该函数时,它会引发 NullPointerException。以下是我的功能和执行过程。我正在使用齐柏林飞艇。奇怪的是,这昨天还在工作,但今天早上它停止工作了。

功能

def convert( time:String ) : String = 
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)

注册函数

sqlContext.udf.register("convert",convert _)

在没有 SQL 的情况下测试函数——这可行

convert(12:12:12) -> returns 12:12

在 Zeppelin 中使用 SQL 测试函数失败。

%sql
select convert(time) from temptable limit 10

temptable 的结构

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)

我得到的堆栈跟踪的一部分。

java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)

【问题讨论】:

【参考方案1】:

使用udf而不是直接定义函数

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => 
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    
)

udf 的输入参数是 Column(或 Columns)。并且返回类型是 Column。

case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) 

  def apply(exprs: Column*): Column = 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  

【讨论】:

【参考方案2】:

您必须将您的函数定义为 UDF。

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val convertUDF: UserDefinedFunction = udf((time:String) => 
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  sdf.format(time1)
)

接下来,您将在 DataFrame 上应用 UDF。

// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing

现在,至于您的实际问题,您收到此错误的一个原因可能是您的 DataFrame 包含为空的行。如果您在应用 UDF 之前将它们过滤掉,您应该能够继续没有问题。

dataFrame.filter(col("time").isNotNull)

我很好奇在运行 UDF 时除了遇到 null 之外还有什么会导致 NullPointerException,如果您发现与我的建议不同的原因,我很高兴知道。

【讨论】:

我可以在不同的 scala 类中定义 convertUDF 并在另一个包中导入该 scala 类以使用该 UDF 吗? @SurenderRaja 我没有看到任何反对它的理由。这是一个独立的功能。你有什么特别的困难吗? @SurenderRaja 如果你使用单例对象,可能会更简单。 是的,我创建了一个单例 Scala 对象并在我所有的 UDFS 中定义了一个方法,它可以工作

以上是关于Scala 和 Spark UDF 函数的主要内容,如果未能解决你的问题,请参考以下文章

关于在 Spark Scala 中创建用户定义函数 (UDF)

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

spark read 在 Scala UDF 函数中不起作用

scala用户定义函数在spark sql中不起作用

整数、长整数或双精度类型作为 Spark UDF 的函数参数

如果其他,Spark scala udf 错误