apache flink 的联合类型混淆?

Posted

技术标签:

【中文标题】apache flink 的联合类型混淆?【英文标题】:apache flink's union type confusion? 【发布时间】:2015-09-04 13:41:19 【问题描述】:

我尝试联合多个 flink DataSet。它们包含在 Seq.以下是产生问题的代码

case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ... 

val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]]  = for (i  <- Range (0,min_n))
      yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all =  ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)

我得到的是一个

线程“main”中的异常 org.apache.flink.api.common.InvalidProgramException:无法联合 不同类型的输入。 Input1=scala.Tuple2(_1: 整数, _2: Option[scala.Tuple4(_1: GenericType [java.time.LocalDateTime], _2: 字符串,_3:整数,_4:布尔值)]),输入2 = scala.Tuple2(_1:整数, _2: Option[scala.Tuple4(_1: GenericType[java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)])

我错过了什么?类型没有区别,是吗?工会运营商应该是便宜的,所以绕过这个问题似乎没有吸引力。 我提供了前两行代码作为 DataSet 中数据类型相同的参数。 我使用了 flink 版本 0.9.0 和 0.9.1

【问题讨论】:

GenericType 不一定要相等。它实际上是什么数据类型? 您真的要对Range(0, min_n) 中的每个元素执行全归约操作吗?为afterLastz 中的每个元素创建不同时差的所有可能值,然后按时差分组,然后在每个组中计算maxBeforezTCZ 不是更好吗?这样你也可以避免union。无论如何,知道afterLastz 的类型会很有帮助。 Mathias 的数据类型参数是 java.time.LocalDateTime。 直到afterLastz的类型为DataSet[clickZap]。 clickZap 是一个案例类。我在问题中的代码中添加了注释。 【参考方案1】:

问题是 Flink 自己的打字系统中的一个错误。代表 Scala OptionOptionTypeInfo 没有定义正确的 equals 方法。因此,两个OptionTypeInfos 未被检测到相等。

我创建了一个JIRA issue 并打开了一个Pull Request 来解决问题。拉取请求应在两天内合并。如果您随后使用最新的0.10-SNAPSHOT 版本,那么您的问题应该得到解决。

【讨论】:

这阐明了这个问题。缺少equals()方法是reflected in the documentation.我的猜测是Either/EitherTypeInfo也有同样的问题。 @SpyrosKomninos,你是完全正确的。感谢您抓住它 :-) 我也会解决这个错误。

以上是关于apache flink 的联合类型混淆?的主要内容,如果未能解决你的问题,请参考以下文章

Flink 代码混淆

Flink 代码混淆

Flink 代码混淆

[Essay] Apache Flink:十分可靠,一分不差

使用别名时的 Apache VirtualHost 混淆

类型提示混淆