Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除

Posted 温岚万叶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除相关的知识,希望对你有一定的参考价值。

类型擦除问题处理



报错日志描述


  • 报错日志:
The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved
// 缺少“Collector”的泛型类型参数。在许多情况下,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息
  • 建议日志:
The return type of function 'main(TypeErasure.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
// 由于类型擦除,无法自动确定函数“main(TypeErasure.java:23)”的返回类型
// 您可以通过对转换调用的结果使用returns(…)方法,
// 或者通过让函数实现“ResultTypeQueryable”接口来提供类型信息提示



问题描述


  • Java 8 自身存在的问题:
    • 在使用Java APILambda 的时候,JVM 运行时会擦除类型(泛型类型)
      • Flink 无法准确获取到数据类型
      • 此时就需要我们手动指定类型
    • Scala 就很好的解决的这个问题,无需指定类型
  • 正常情况下编写Java: 会需要手动设置输入格式,和输出格式
source.flatMap(new FlatMapFunction<String, Object>() 
        )    // 输入格式 String,输出格式 Object
  • Java 使用 Lambda 表达式:
source.flatMap(()->
            // 输入和输出格式都没有指定,java 8 无法做自动类型推断
        )
  • 需要手动指定类型:
source.flatMap(()->
           // 所以需要手动指定类型
        ,Types.类型)


报错解决


  • 解决方案:
    • 在Flink中经常使用的类型已经预定义在了 Types 中它们的 serializer/deserializer 和 Comparator 已经定义好了
    • Tuple 类型既可以使用 TypeHint 指定又可以使用 Types 指定
  • 问题代码:
SingleOutputStreamOperator<String> flatMap = source.flatMap((line, collect) -> 
            String[] words = line.split(" ");
            for (String word : words) 
                collect.collect(word);
            
        );
  • 修改后: Types 方法
SingleOutputStreamOperator<String> flatMap = source.flatMap((line, collect) -> 
            String[] words = line.split(" ");
            for (String word : words) 
                collect.collect(word);
            
        ,Types.STRING);      // 指定类型
  • 其他案例:
SingleOutputStreamOperator<Tuple2<Object, Integer>> map = flatMap
      .map(word -> Tuple2.of(word, 1)
      ,Types.TUPLE(Types.STRING, Types.INT));
  • ps:简单易懂,我比较喜欢用这种




其他方法



方法一:TypeInformation

  • TypeInformation 是Flink类型系统的核心,是生成序列化/反序列化工具和 Comparator 的工具类
    • 同时它还是连接schema和编程语言内部类型系统的桥梁
  • 可以使用 of 方法创建 TypeInformation :
    • of(Class typeClass):从 Class 创建
    • of(TypeHint typeHint):从 TypeHint 创建

方法二:TypeHint

  • 由于泛型类型在运行时会被JVM擦除,所以说我们无法使用
    • TypeInformation.of(XXX.class) 方式指定带有泛型的类型
  • 为了可以支持泛型类型,Flink引入了 TypeHint
    • 例如我们需要获取 Tuple2<String, Long> 的类型信息,可以使用如下方式:
TypeInformation<Tuple2<String, Long>> info = TypeInformation.of(new TypeHint<Tuple2<String, Long>>());
// 或者
TypeInformation<Tuple2<String, Long>> info = new TypeHint<Tuple2<String, Long>>().getTypeInfo();

下班…

以上是关于Flink:The generic type parameters of ‘Collector‘ are missing 类型擦除的主要内容,如果未能解决你的问题,请参考以下文章

Flink SQL xxx is not serializable. The object probably contains or references non serializable field

Flink SQL xxx is not serializable. The object probably contains or references non serializable field

unity报错The type or namespace name `NavMeshAgent' could not be found.

Assets/FollowDestination.cs(6,13): error CS0246: The type or namespace name `NavMeshAgent' could

解决:The content of element type "web-app" must match "(icon?display

Flink The new key serializer must be compatible with the previous key serializer