如何在 Spark 中创建 UDF 以支持自定义谓词

Posted

技术标签:

【中文标题】如何在 Spark 中创建 UDF 以支持自定义谓词【英文标题】:How to create UDF in Spark to support custom predicate 【发布时间】:2017-11-11 10:15:02 【问题描述】:

我有一个数据框,它有一个列表数据类型的字段,需要与交叉连接匹配,条件是如果列表中的任何元素存在于另一个列表中,那么两条记录都应该被视为匹配.

示例。

import org.apache.spark.sql.functions.udf


val df = sc.parallelize(Seq(("one", List(1,34,3)), ("one", List(1,2,3)), ("two", List(1))))
          .toDF("word", "count")

val lsEqual = (xs : (List[Int],List[Int])) => xs._1.find(xs._2.contains(_)).nonEmpty
 val equalList = udf(lsEqual)

但这给了我以下错误

val out =  df.joinWith(df,equalList(df("count"),df("count")),"cross")
java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function2
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:97)
at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:56)
... 50 elided

还有其他方法可以创建自定义谓词吗?

【问题讨论】:

【参考方案1】:

您的lsEqual 函数定义似乎有误。 ListSeqArraySpark 数据帧中被视为 WrappedArray。而您将两个columns 传递给lsEqual 函数,这应该是两个变量。

正确的方法应该是

val lsEqual = (xs1 : scala.collection.mutable.WrappedArray[Int], xs2 : scala.collection.mutable.WrappedArray[Int]) => xs1.find(xs2.contains(_)).nonEmpty

这绝对应该消除您面临的错误

【讨论】:

以上是关于如何在 Spark 中创建 UDF 以支持自定义谓词的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中创建有状态的 UDF?

如何在 WordPress 插件安装中创建 MySql 用户自定义函数?

如何在 Java 中创建一个接受字符串数组的 Spark UDF?

在 Spark 中创建 UDF 时出错

如何在 PySpark ML 中创建自定义 SQLTransformer 以透视数据

在 Python 中创建自定义 Spark RDD