任务在 Databricks 上的 Scala 中不可序列化

Posted

技术标签:

【中文标题】任务在 Databricks 上的 Scala 中不可序列化【英文标题】:Task not serializable in Scala on Databricks 【发布时间】:2020-08-30 13:37:02 【问题描述】:

我正在尝试使用 Scala 在 Databricks 中实现 UDF 功能。即使在将函数封装在一个类中并继承了 Serializable 类之后,也会出现 Task not serializable 错误。请参考以下代码:

var rkList = List[String]("")

class appendData extends Serializable
  var cKey = ""

  def addKey(data:String):String=
    if(data=="")
    
      return cKey
    
    else
    
      cKey=data
      return cKey
    
  

  def execute(dframe: DataFrame): DataFrame =
    val keyAddUDF = udf[String, String](addKey)

    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")

    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
  



df = (new appendData).execute(df)

【问题讨论】:

【参考方案1】:

您不应该将execute 方法和udf 方法放在同一个类中。单独定义addKey函数,如:


def addKey(data:String): String = 
    var rkList = List[String]("")
    var cKey = ""

    if(data=="") 
      return cKey
     else 
      cKey=data
      return cKey
    


val keyAddUDF = udf[String, String](addKey)


def transformDf(dframe: DataFrame): DataFrame =
    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")
    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
  


df = transformDf(df)


【讨论】:

以上是关于任务在 Databricks 上的 Scala 中不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python/Scala 的 Databricks 雪花表

Azure Databricks 上的最大消息大小

在 Databricks 中将字符串从 SCALA 传递到 Python

注册函数时Databricks SCALA UDF无法加载类

使用 databricks 在 Spark(scala) 中生成具有属性和值的 XML

databricks、spark、scala,不能长时间使用 lag()