使用 Scala 类作为带有 pyspark 的 UDF

Posted

技术标签:

【中文标题】使用 Scala 类作为带有 pyspark 的 UDF【英文标题】:Using Scala classes as UDF with pyspark 【发布时间】:2018-04-03 14:04:13 【问题描述】:

在使用 Apache Spark 时,我正在尝试将一些计算从 Python 卸载到 Scala。我想使用 Java 中的类接口来使用持久变量,就像这样(这是基于我更复杂的用例的无意义的 MWE):

package mwe

import org.apache.spark.sql.api.java.UDF1

class SomeFun extends UDF1[Int, Int] 
  private var prop: Int = 0

  override def call(input: Int): Int = 
    if (prop == 0) 
      prop = input
    
    prop + input
  

现在我正在尝试在 pyspark 中使用这个类:

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)

sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")

df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()

并得到以下异常:

pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n      +- LogicalRDD [num#0L], false\n"

实现这一点的正确方法是什么?我必须求助于Java本身来上课吗?非常感谢您的提示!

【问题讨论】:

【参考方案1】:

异常的来源是使用了不兼容的类型:

首先,o.a.s.sql.api.java.UDF* 对象需要外部 Java(不是 Scala 类型),因此预期整数的 UDF 应该采用装箱的 Integer (java.lang.Integer) 而不是 Int

class SomeFun extends UDF1[Integer, Integer] 
  ...
  override def call(input: Integer): Integer = 
    ...

除非您使用旧版 Python num 列使用 LongType 而不是 IntegerType

df0.printSchema()
root
 |-- num: long (nullable = true)

所以实际的签名应该是

class SomeFun extends UDF1[java.lang.Long, java.lang.Long] 
  ...
  override def call(input: java.lang.Long): java.lang.Long = 
    ...

或者在应用UDF之前应该转换数据

df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")

最后,在 UDF 中不允许使用可变状态。它不会导致异常,但整体行为将是不确定的。

【讨论】:

谢谢!我用 MWE 复制了这个,现在必须调整我的其他代码,以及看看是否有办法绕过可变状态(确定性不是关键)。

以上是关于使用 Scala 类作为带有 pyspark 的 UDF的主要内容,如果未能解决你的问题,请参考以下文章

pyspark对应的scala代码PythonRDD对象

带有 casbah 的 scala 案例类。接受 objectid 参数作为字符串或 objectid

pyspark对应的scala代码PythonRDD类

在 pyspark 的 Scala UDF 中使用默认参数值?

我如何使用 s & $ 访问 Pyspark 中的变量,就像在 Scala 中一样

Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?