识别 Apache-Flink 中哪个对象不可序列化

Posted

技术标签:

【中文标题】识别 Apache-Flink 中哪个对象不可序列化【英文标题】:Identify which object is not serializable in Apache-Flink 【发布时间】:2019-02-01 18:32:58 【问题描述】:

我正在编写一个 Flink 转换器,我有一个自定义对象 Histogram,具有以下属性:

case class Histogram(
  nRows: Int,
  nCols: Int,
  min: Int,
  step: Double,
  private val countMatrix: Array[ArrayBuffer[Double]],
  private val cutMatrixL1: Array[ArrayBuffer[Double]],
  val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
  private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
  private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
  extends Serializable 
    ???

这是我的FitOperation

implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] 
    override def fit(
                      instance: PIDiscretizerTransformer,
                      fitParameters: ParameterMap,
                      input: DataSet[LabeledVector]): Unit = 

      // get params...

      val metric = input.map  x ⇒
        // (instance, histrogram totalCount)
        (x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
      .reduce  (m1, m2) ⇒
        // Update Layer 1
        val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)

        //         Update Layer 2 if neccesary
        val updatedL2 = if (m1._3 % l2updateExamples == 0) 
          updateL2(m1._1, updatedL1)
         else updatedL1

        (m2._1, updatedL2, m1._3 + 1)
      .map(_._2)

      //      instance.metricsOption = Some(metric)
    
  

这很好用,但如果我取消注释最后一行:instance.metricsOption = Some(metric) 我会得到一个java.io.NotSerializableException: org.apache.flink.api.scala.DataSet

如何找到我的班级Histogram 中的哪个对象导致了问题?据我所知ArrayBuffer 是可序列化的,Map 也是。虽然我发现了这个 SO 问题:

Map can not be serializable in scala?

上面写着.mapValues 不可序列化,但我没有在任何地方使用.mapValues

【问题讨论】:

你能告诉我完整的代码吗?您可能会在fitOp 在其闭包中捕获DataSet 实例的上下文中创建。 当然,这里是:直方图:github.com/elbaulp/DPASF/blob/master/dpasf/src/main/scala/com/… 变压器:github.com/elbaulp/DPASF/blob/master/dpasf/src/main/scala/com/… 【参考方案1】:

问题是您在MapFunction 中引用了instance.stepinstance 属于 PIDiscretizerTransformer 类型,无法序列化。因此,您需要在 MapFunction 之外计算 step 并将值传递给函数。那么你的程序应该是可序列化的。

【讨论】:

谢谢!就是这样。在提出问题之前,删除了地图中对 instance.step 的调用,但忘记了我在下一次调用 (reduce) 中也使用了它,所以我仍然收到错误。

以上是关于识别 Apache-Flink 中哪个对象不可序列化的主要内容,如果未能解决你的问题,请参考以下文章

使用不可序列化的对象序列化 hashmap

Apache-Flink深度解析-State

是否允许在 React 上下文中存储不可序列化的对象?

如何将不可解析和不可序列化的对象插入 Bundle

哪个更适合用于序列化对象? JsonSerializer 或 JsonConvert

Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF