解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException相关的知识,希望对你有一定的参考价值。

Bug描述

当写好WordCount程序,使用了Idea自带的显式代码自动转Lambda表达式时,就可能出现这种错误,例如:

package com.zhiyong.flinkStudy;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

@Slf4j
public class flinkWordCountDemo1 
    public static void main(String[] args) throws Exception
//        System.out.println("Java环境正常");

        String inputPath = "E:/study/flink/data/test1";


        //initLogRecord.initLog();
        //log.info("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource<String> data = env.readTextFile(inputPath);

        // Trans运算
        AggregateOperator<Tuple2<String, Integer>> result = data.flatMap((FlatMapFunction<String, String>) (s, collector) -> 
            String[] split = s.trim().split("\\\\s+");
            for (String cell : split) 
                collector.collect(cell);
            

        ).map((MapFunction<String, Tuple2<String, Integer>>) s -> Tuple2.of(s, 1)).groupBy(0).sum(1);

        // Sink写数据
        result.print();

        // 执行
        //env.execute("老版本print需要这一句");
    


执行后报错:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(flinkWordCountDemo1.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
	at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
	at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:32)

Process finished with exit code 1

这种情况就是Idea好心办坏事了!!!Spark中可以随意让Idea自动转Lambda表达式【至少目前没遇到什么大问题】,Flink切记,不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!重要的话说三遍。

故障原因

使用了Lambda表达式,就需要框架能够自行推测出需要使用的数据类型。恰巧Flink框架目前的版本还不具备这样的能力,导致报错。

解决方式

方式一:取消Lambda表达式

package com.zhiyong.flinkStudy;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

@Slf4j
public class flinkWordCountDemo1 
    public static void main(String[] args) throws Exception

        String inputPath = "E:/study/flink/data/test1";

        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");

        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource<String> data = env.readTextFile(inputPath);

        // Trans运算
        AggregateOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, String>() 
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception 
                String[] split = s.trim().split("\\\\s+");
                for (String cell : split) 
                    collector.collect(cell);
                

            
        ).map(new MapFunction<String, Tuple2<String, Integer>>() 
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception 
                return Tuple2.of(s, 1);
            
        ).groupBy(0).sum(1);

        // Sink写数据
        result.print();
    

只需要将Lambda表达式显式写明入参与出参类型即可正常运行:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)

Process finished with exit code 0

方式二:继承类中显式指定泛型

先实现FlatMapFunction:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction<String,String> 
    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception 
        String[] split = s.trim().split("\\\\s+");
        for (String cell : split) 
            collector.collect(cell);
        
    

接着实现MapFunction:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class FlinkWordCountDemo2MapFunction implements MapFunction<String,Tuple2<String, Integer>> 

    @Override
    public Tuple2<String, Integer> map(String s) throws Exception 
        return Tuple2.of(s, 1);
    

之后在主方法中调用:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;

public class FlinkWordCountDemo2 
    public static void main(String[] args) throws Exception
        String inputPath = "E:/study/flink/data/test1";
        System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
        // 获取Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Source读数据
        DataSource<String> data = env.readTextFile(inputPath);

        AggregateOperator result = data.flatMap(new FlinkWordCountDemo2FlatMapFunction()).map(new FlinkWordCountDemo2MapFunction())
                .groupBy(0).sum(1);

        result.print();
    

即可看到正确结果:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)

Process finished with exit code 0

如果实现类中没有标明泛型,例如偷个懒,把FlatMapFunction的继承类的泛型去掉:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction 
//    @Override
//    public void flatMap(String s, Collector<String> collector) throws Exception 
//        String[] split = s.trim().split("\\\\s+");
//        for (String cell : split) 
//            collector.collect(cell);
//        
//    

    @Override
    public void flatMap(Object o, Collector collector) throws Exception 
        String[] split = o.toString().trim().split("\\\\s+");
        for (String cell : split) 
            collector.collect(cell);
        
    


再偷个懒,把MapFunction实现类的泛型去掉:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class FlinkWordCountDemo2MapFunction implements MapFunction 

//    @Override
//    public Tuple2<String, Integer> map(String s) throws Exception 
//        return Tuple2.of(s, 1);
//    

    @Override
    public Object map(Object o) throws Exception 
        return Tuple2.of(o.toString(), 1);
    

FlinkWordCountDemo2不变的情况下,执行依旧是会报相同的错:

Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkWordCountDemo2.java:17)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
	at com.zhiyong.flinkStudy.FlinkWordCountDemo2.main(FlinkWordCountDemo2.java:17)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1384)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1412)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1369)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:575)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
	... 1 more

Process finished with exit code 1

Flink对Lambda表达式的支持还有待提高,至少当下必须遵循这一规则。方式二的做法好处就是实现了计算逻辑的抽取,简化了主类的篇幅,提高了主类代码的可阅读性,同时也可以实现相同逻辑相同算法的复用。

特别说明

出现问题的版本号:Flink1.14.3。POM如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>study</artifactId>
        <groupId>study.zhiyong</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flinkStudy</artifactId>
    <packaging>pom</packaging>

    <!--    <properties>-->
<!--        <maven.compiler.source>8</maven.compiler.source>-->
<!--        <maven.compiler.target>8</maven.compiler.target>-->
<!--    </properties>-->


    <!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
    <repositories>
        以上是关于解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException的主要内容,如果未能解决你的问题,请参考以下文章

tomcat启动时,内存溢出,Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thr

Exception in thread main org.apache.ibatis.exceptions.PersistenceException:报错解决

日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....

日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....

Eclipse 运行弹出A Java Exception has occurred.并报错Exception in thread 的解决方案

Java编译报错:Exception in thread “main“ java.util.NoSuchElementException(剖析原因理解及解决方法)