在 pyspark 的 Scala UDF 中使用默认参数值?
Posted
技术标签:
【中文标题】在 pyspark 的 Scala UDF 中使用默认参数值?【英文标题】:Using default argument values in Scala UDF from pyspark? 【发布时间】:2018-12-10 09:28:20 【问题描述】:我在 Scala 中定义了一个带有默认参数值的 UDF,如下所示:
package myUDFs
import org.apache.spark.sql.api.java.UDF3
class my_udf extends UDF3[Int, Int, Int, Int]
override def call(a: Int, b: Int, c: Int = 6): Int =
c*(a + b)
然后我使用 build clean assembly
适当地构建它(如果需要,可以提供更多构建细节)并提取 jar myUDFs-assembly-0.1.1.jar
并将其包含在我的 Python 中的 Spark 配置中:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntType
spark_conf = SparkConf().setAll([
('spark.jars', 'myUDFs-assembly-0.1.1.jar')
])
spark = SparkSession.builder \
.appName('my_app') \
.config(conf = spark_conf) \
.enableHiveSupport() \
.getOrCreate()
spark.udf.registerJavaFunction(
"my_udf", "myUDFs.my_udf", IntType()
)
但是,当我尝试利用默认值时,我被拒绝了:
spark.sql('select my_udf(1, 2)').collect()
AnalysisException: '函数 my_udf 的参数数量无效。预期:3;发现:2;行 x pos y'
难道不能有这样一个默认值的 UDF 吗?输出应该是6*(1+2) = 18
。
【问题讨论】:
@user10465355 谢谢,我已经提交了issues.apache.org/jira/browse/SPARK-26331;也许有一天会成为可能 【参考方案1】:仅查看调用链,此处无法识别默认参数。
PythonregisterJavaFunction
invokes its JVM UDFRegistration.registerJava
.
registerJava
invokes matching register
implementation.
如果是UDF3
,looks like this:
* Register a deterministic Java UDF3 instance as user-defined function (UDF).
* @since 1.3.0
*/
def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit =
val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any)
def builder(e: Seq[Expression]) = if (e.length == 3)
ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
else
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 3; Found: " + e.length)
functionRegistry.createOrReplaceTempFunction(name, builder)
如您所见,builder
仅在实际调度调用之前验证提供的表达式是否与函数的arity
匹配。
如果实现一个中间 API,它会处理默认参数并在幕后分派到 UDF,您可能会更幸运。但是,这仅适用于 DataFrame
API,因此可能不符合您的需求。
【讨论】:
【参考方案2】:在 spark sql 中调用函数时,您只传递了两个参数。尝试传递三个参数
spark.sql('select my_udf(1, 2, 3 )').collect()
【讨论】:
是的,抱歉,这不是重点。我知道在给定函数定义的情况下我可以传递三个参数,但我认为我也应该能够传递 2(因为参数c
定义为默认 6
)
你可以在函数内部初始化变量“C”,为什么要在参数中传递它以上是关于在 pyspark 的 Scala UDF 中使用默认参数值?的主要内容,如果未能解决你的问题,请参考以下文章
从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用
如何在 Scala Spark 项目中使用 PySpark UDF?