从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用

Posted

技术标签:

【中文标题】从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用【英文标题】:Register UDF to SqlContext from Scala to use in PySpark 【发布时间】:2016-07-28 13:17:37 【问题描述】:

是否可以注册一个用 Scala 编写的 UDF(或函数)以在 PySpark 中使用? 例如:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

在 Scala 中,现在可以进行以下操作:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

我想在 PySpark 中使用“UDFaddOne”

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

背景:我们是一个开发人员团队,一些使用 Scala 编码,一些使用 Python,并希望分享已经编写的函数。也可以将其保存到库中并导入。

【问题讨论】:

【参考方案1】:

据我所知,PySpark 不提供任何等效的 callUDF 函数,因此无法直接访问已注册的 UDF。

这里最简单的解决方案是使用原始 SQL 表达式:

mytable.withColumn("moreSpam", expr("UDFaddOne()".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

这种方法相当有限,因此如果您需要支持更复杂的工作流程,您应该构建一个包并提供完整的 Python 包装器。你会在我对Spark: How to map Python with Scala or Java User Defined Functions?的回答中找到并举例说明 UDAF 包装器

【讨论】:

感谢您的回答和其他答案 - 我按照您的建议解决了!【参考方案2】:

以下内容对我有用(基本上是多个地方的摘要,包括 zero323 提供的链接):

在斯卡拉中:

package com.example
import org.apache.spark.sql.functions.udf

object udfObj extends Serializable 
  def createUDF = 
    udf((x: Int) => x + 1)
  

在 python 中(假设 sc 是 spark 上下文。如果您使用的是 spark 2.0,则可以从 spark 会话中获取它):

from py4j.java_gateway import java_import
from pyspark.sql.column import Column

jvm = sc._gateway.jvm
java_import(jvm, "com.example")
def udf_f(col):
    return Column(jvm.com.example.udfObj.createUDF().apply(col))

当然还要确保在 scala 中创建的 jar 是使用 --jars 和 --driver-class-path 添加的

那么这里会发生什么:

我们在一个可序列化对象中创建一个函数,该函数返回 scala 中的 udf(我不是 100% 确定需要 Serializable,对于更复杂的 UDF 来说它是必需的,所以可能是因为它需要传递 java 对象)。

在 python 中,我们使用访问内部 jvm(这是一个私有成员,因此将来可以更改它,但我认为没有办法)并使用 java_import 导入我们的包。 我们访问 createUDF 函数并调用它。这将创建一个具有 apply 方法的对象(scala 中的函数实际上是具有 apply 方法的 java 对象)。 apply 方法的输入是一列。应用列的结果是一个新列,所以我们需要用 Column 方法包装它,以使其可用于 withColumn。

【讨论】:

这看起来确实有点hacky,我不确定我是否会在测试代码之外使用它,但它让我对内部工作有了更多的了解,所以谢谢你! 试图在 pysaprk 2.1.1 上运行,我收到以下错误:spark 代码库中的TypeError: 'Column' object is not callable 似乎您无法实例化 Column 对象 这基本上意味着导入存在问题。 jar 不在类路径中,或者名称错误或类似的东西。

以上是关于从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将字符串注册为 UDF?

Spark 2.1 注册UDF到functionRegistry

注册 UDF 时出现 Spark 错误:不支持 AnyRef 类型的架构

我们如何在 Spark-Scala 和 Cataloging UDF 中注册一个函数以及其他函数?

将 spark.sql 查询转换为 spark/scala 查询

使用 Scala 类作为带有 pyspark 的 UDF