在 pyspark 的 Scala UDF 中使用默认参数值?

Posted

技术标签:

【中文标题】在 pyspark 的 Scala UDF 中使用默认参数值?【英文标题】:Using default argument values in Scala UDF from pyspark? 【发布时间】:2018-12-10 09:28:20 【问题描述】:

我在 Scala 中定义了一个带有默认参数值的 UDF,如下所示:

package myUDFs

import org.apache.spark.sql.api.java.UDF3

class my_udf extends UDF3[Int, Int, Int, Int] 

  override def call(a: Int, b: Int, c: Int = 6): Int = 
    c*(a + b)
  

然后我使用 build clean assembly 适当地构建它(如果需要,可以提供更多构建细节)并提取 jar myUDFs-assembly-0.1.1.jar 并将其包含在我的 Python 中的 Spark 配置中:

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntType

spark_conf = SparkConf().setAll([
    ('spark.jars', 'myUDFs-assembly-0.1.1.jar')
])

spark = SparkSession.builder \
    .appName('my_app') \
    .config(conf = spark_conf) \
    .enableHiveSupport() \
    .getOrCreate()

spark.udf.registerJavaFunction(
    "my_udf", "myUDFs.my_udf", IntType()
)

但是,当我尝试利用默认值时,我被拒绝了:

spark.sql('select my_udf(1, 2)').collect()

AnalysisException: '函数 my_udf 的参数数量无效。预期:3;发现:2;行 x pos y'

难道不能有这样一个默认值的 UDF 吗?输出应该是6*(1+2) = 18

【问题讨论】:

@user10465355 谢谢,我已经提交了issues.apache.org/jira/browse/SPARK-26331;也许有一天会成为可能 【参考方案1】:

仅查看调用链,此处无法识别默认参数。

Python registerJavaFunction invokes its JVM UDFRegistration.registerJava. registerJavainvokes matching register implementation.

如果是UDF3,looks like this:

 * Register a deterministic Java UDF3 instance as user-defined function (UDF).
 * @since 1.3.0
 */
def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = 
  val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
  def builder(e: Seq[Expression]) = if (e.length == 3) 
    ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
   else 
    throw new AnalysisException("Invalid number of arguments for function " + name +
      ". Expected: 3; Found: " + e.length)
  
  functionRegistry.createOrReplaceTempFunction(name, builder)

如您所见,builder 仅在实际调度调用之前验证提供的表达式是否与函数的arity 匹配。

如果实现一个中间 API,它会处理默认参数并在幕后分派到 UDF,您可能会更幸运。但是,这仅适用于 DataFrame API,因此可能不符合您的需求。

【讨论】:

【参考方案2】:

在 spark sql 中调用函数时,您只传递了两个参数。尝试传递三个参数

spark.sql('select my_udf(1, 2, 3 )').collect()

【讨论】:

是的,抱歉,这不是重点。我知道在给定函数定义的情况下我可以传递三个参数,但我认为我也应该能够传递 2(因为参数 c 定义为默认 6 你可以在函数内部初始化变量“C”,为什么要在参数中传递它

以上是关于在 pyspark 的 Scala UDF 中使用默认参数值?的主要内容,如果未能解决你的问题,请参考以下文章

从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用

如何在 Scala Spark 项目中使用 PySpark UDF?

使用 Scala 类作为带有 pyspark 的 UDF

PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)

udf(用户定义函数)如何在 pyspark 中工作?

pyspark中未定义的函数UDF?