Scala PartialFunction ***

Posted

技术标签:

【中文标题】Scala PartialFunction ***【英文标题】: 【发布时间】:2016-03-13 08:54:51 【问题描述】:

我正在开发一个名为 PySpark Cassandra 的 Scala / Python 库。在其中,我必须以pickle格式处理对象序列化的Python对象,例如保存数据。

我的工作因 stackoverlfow 而失败:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 250 in stage 2.0 failed 4 times, most recent failure: Lost task 250.3 in stage 2.0 (TID 411, sp-prod-adg02.priv.tgho.nl): java.lang.***Error
        at pyspark_cassandra.UnpickledUUIDConverter$$anonfun$convertPF$1.applyOrElse(Pickling.scala:121)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        ...
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)

启动此跟踪的代码是:

object UnpickledUUIDConverter extends TypeConverter[UUID] 
  val tt = typeTag[UUID]
  def targetTypeTag = tt
  def convertPF =  case holder: UUIDHolder => holder.uuid 

位于https://github.com/TargetHolding/pyspark-cassandra/blob/master/src/main/scala/pyspark_cassandra/Pickling.scala#L118(了解更多详细信息和上下文)。

UUIDHolder 类定义为:

class UUIDHolder 
  var uuid: UUID = null

  def __setstate__(values: HashMap[String, Object]): UUID = 
    val i = values.get("int").asInstanceOf[BigInteger]
    val buffer = ByteBuffer.wrap(i.toByteArray())
    uuid = new UUID(buffer.getLong(), buffer.getLong())
    uuid
  

(这个类的奇怪构造是为了与py4j兼容以及Python如何腌制UUID对象)

但我对 Scala 以及 case 块和 PartialFunctions 之间关系的理解相当有限。尤其是我的案例块与https://github.com/scala/scala/blob/2.10.x/src/library/scala/PartialFunction.scala#L166 的关系(我在 Scala 2.10.5 上运行)

使我的情况恶化:) 我很难始终如一地重现错误。它发生在不同节点上的 Spark 作业中,但并非总是如此。我有一个数据集,在保存该数据集时存在问题。但我无法将其固定到数据集中的特定记录。

在任何情况下,我都不希望使用此代码出现 ***。任何帮助将不胜感激!

【问题讨论】:

【参考方案1】:

回答简单的问题:

您的 case 块是一个部分函数文字,如 here 解释的那样,又名模式匹配匿名函数。这是因为convertPF 的返回类型是一个偏函数。

它得到一个applyOrElse,如here 所述,这避免了调用ifDefined 然后apply

堆栈上的OrElse 就是包装pf1 orElse pf2 的内容。它的applyOrElse 实现代表每个PartialFunction。

很长的pfi orElse pfi_++ 链可能会在评估时溢出堆栈,或者orElse_i orElse (orElse_i++ orElse ...)

【讨论】:

感谢@som-snytt 的回答。我是沿着这些思路思考的。但是在这种情况下,我无法理解 orElse 是什么?我希望我的部分函数(case 块)只为 UUIDHolder 类型的对象定义,并且在使用其他东西调用时会引发 MatchError 或类似的东西(isDefinedAt 将为该输入返回 false)。我真的不知道递归会从哪里来。 刚和一位同事进行了一次橡皮鸭会议,这很有帮助。这确实是一长串 orElse 调用。由多次注册 UnpickledUUIDConverter 引起(在 Spark Cassandra 连接器中),因为类型转换器是使用 orElse 构造调用的,并且转换器列表过长会引发 ***。

以上是关于Scala PartialFunction ***的主要内容,如果未能解决你的问题,请参考以下文章

Scala中的偏函数

scala 偏函数

scala偏函数小栗子

Scala--偏函数

Scala快速入门--偏函数

13. Scala函数式编程(高级部分)