识别 Apache-Flink 中哪个对象不可序列化
Posted
技术标签:
【中文标题】识别 Apache-Flink 中哪个对象不可序列化【英文标题】:Identify which object is not serializable in Apache-Flink 【发布时间】:2019-02-01 18:32:58 【问题描述】:我正在编写一个 Flink 转换器,我有一个自定义对象 Histogram
,具有以下属性:
case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable
???
这是我的FitOperation
:
implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector]
override def fit(
instance: PIDiscretizerTransformer,
fitParameters: ParameterMap,
input: DataSet[LabeledVector]): Unit =
// get params...
val metric = input.map x ⇒
// (instance, histrogram totalCount)
(x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
.reduce (m1, m2) ⇒
// Update Layer 1
val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)
// Update Layer 2 if neccesary
val updatedL2 = if (m1._3 % l2updateExamples == 0)
updateL2(m1._1, updatedL1)
else updatedL1
(m2._1, updatedL2, m1._3 + 1)
.map(_._2)
// instance.metricsOption = Some(metric)
这很好用,但如果我取消注释最后一行:instance.metricsOption = Some(metric)
我会得到一个java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
如何找到我的班级Histogram
中的哪个对象导致了问题?据我所知ArrayBuffer
是可序列化的,Map 也是。虽然我发现了这个 SO 问题:
Map can not be serializable in scala?
上面写着.mapValues
不可序列化,但我没有在任何地方使用.mapValues
。
【问题讨论】:
你能告诉我完整的代码吗?您可能会在fitOp
在其闭包中捕获DataSet
实例的上下文中创建。
当然,这里是:直方图:github.com/elbaulp/DPASF/blob/master/dpasf/src/main/scala/com/… 变压器:github.com/elbaulp/DPASF/blob/master/dpasf/src/main/scala/com/…
【参考方案1】:
问题是您在MapFunction
中引用了instance.step
。 instance
属于 PIDiscretizerTransformer
类型,无法序列化。因此,您需要在 MapFunction
之外计算 step 并将值传递给函数。那么你的程序应该是可序列化的。
【讨论】:
谢谢!就是这样。在提出问题之前,删除了地图中对instance.step
的调用,但忘记了我在下一次调用 (reduce
) 中也使用了它,所以我仍然收到错误。以上是关于识别 Apache-Flink 中哪个对象不可序列化的主要内容,如果未能解决你的问题,请参考以下文章
哪个更适合用于序列化对象? JsonSerializer 或 JsonConvert
Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF