如何使用反射从scala调用spark UDF?

Posted

技术标签:

【中文标题】如何使用反射从scala调用spark UDF?【英文标题】:How to call spark UDF from scala using reflection? 【发布时间】:2019-06-18 16:27:31 【问题描述】:

我正在构建一个依赖于 java 库的 spark 应用程序。暴露的 Java 接口为

String doSomething(String, Map<String,String>)

我创建了一个 UDF 为

def myfunc(properties: Map[String, String]) = udf((data: String) => 
    ...
    doSomething(data,properties)
)

这个函数可以在 spark shell 中以myfunc(properties)(data) 的形式调用,其中properties 是一个Map,data 是Column 类型。

问题是我需要通过 scala 文件的反射来调用它。我需要做这样的事情:

val c = Class.forName("package.class")
val m = c.getMethod("myfunc",classOf[Map[String,String]])
m.invoke(c.newInstance, someMap)

m.invoke 返回函数本身。如何以及在何处传递 Column 参数?或者有没有其他方法可以将这些属性映射传递给 spark UDF,以便可以通过反射直接调用它?

【问题讨论】:

【参考方案1】:

试试

m.invoke(c.newInstance, someMap).asInstanceOf[UserDefinedFunction].apply(data)

对于data 类型的Column

【讨论】:

以上是关于如何使用反射从scala调用spark UDF?的主要内容,如果未能解决你的问题,请参考以下文章

java,如何在spark 1.4.1中调用UDF [重复]

如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

Scala 和 Spark UDF 函数

scala用户定义函数在spark sql中不起作用

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]