无法在 spark sql 中注册 UDF

Posted

技术标签:

【中文标题】无法在 spark sql 中注册 UDF【英文标题】:Not able to register UDF in spark sql 【发布时间】:2020-05-06 10:33:07 【问题描述】:

我试图注册我的 UDF 函数并想在我的 spark sql 查询中使用它,但无法注册我的 udf 我得到以下错误。

    val squared = (s: Column) =>  
    concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))
    
    squared: org.apache.spark.sql.Column => org.apache.spark.sql.Column = <function1>

    scala> sqlContext.udf.register("dc",squared)
    java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not   supported
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:733)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:671)
    at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:143)
    ... 48 elided

我尝试将 Column 更改为 String 但出现以下错误。

    val squared = (s: String) =>  
    | concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))
    | 
    <console>:28: error: type mismatch;
    found   : String
    required: org.apache.spark.sql.Column
   concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy")))))


   can someone please guide me how should i implement this.

【问题讨论】:

AFAIK 用户定义的 UDF 只能有原始数据类型参数和返回类型为 String、Long、Array ......但不是 Spark 的列 【参考方案1】:

此包 org.apache.spark.sql.functions._ 中的所有 spark 函数将无法在 UDF 中访问。

代替内置的 spark 函数..您可以使用普通的 scala 代码来获得相同的结果。

val df = spark.sql("select * from your_table")

def date_concat(date:Column): Column =  
    concat(substring(date,4,2),year(to_date(from_unixtime(unix_timestamp(date,"dd-MM-yyyy")))))


df.withColumn("date_column_name",date_concat($"date_column_name")) // with function.
df.withColumn("date_column_name",concat(substring($"date_column_name",4,2),year(to_date(from_unixtime(unix_timestamp($"date_column_name","dd-MM-yyyy")))))) // without function, direct method.
df.createOrReplaceTempView("table_name")
spark.sql("[...]") // Write your furthur logic in sql if you want.

【讨论】:

嗨@Srinivas你能指导我或给我一些提示吗?我是第一次使用UDF的scala新手我在scala def date_concat(idt:Column)中创建了一个方法:Column = val a1=concat(substring(idt,4,2),year(to_date(from_unixtime(unix_timestamp(idt,"dd-MM-yyyy"))))) a1 并在 sparksql 查询和传递表中使用此方法名称列名作为参数。但它对我不起作用 简单的方法是使用没有 spark.sql 的 DF 并转换所有需要的日期并创建临时表并像普通 SQL 查询一样使用。 早上我已经回答了同样的问题。 我已经更新了答案中的代码,您可以在没有 UDF 的情况下执行类似操作。 根据你的问题,它不可能在 UDF 中使用函数,更新了解决方法的答案..如果不起作用,请告诉我..

以上是关于无法在 spark sql 中注册 UDF的主要内容,如果未能解决你的问题,请参考以下文章

Spark 2.1 UDF 未在 Spark Jar 中注册

在 sparkSession 上注册两个同名 udf

spark的udf和udaf的注册

注册匿名类功能

Apache Spark - 注册 UDF - 返回数据帧

Spark SQL UDF示例