解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException相关的知识,希望对你有一定的参考价值。
Bug描述
当写好WordCount程序,使用了Idea自带的显式代码自动转Lambda表达式时,就可能出现这种错误,例如:
package com.zhiyong.flinkStudy;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@Slf4j
public class flinkWordCountDemo1
public static void main(String[] args) throws Exception
// System.out.println("Java环境正常");
String inputPath = "E:/study/flink/data/test1";
//initLogRecord.initLog();
//log.info("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
// 获取Env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Source读数据
DataSource<String> data = env.readTextFile(inputPath);
// Trans运算
AggregateOperator<Tuple2<String, Integer>> result = data.flatMap((FlatMapFunction<String, String>) (s, collector) ->
String[] split = s.trim().split("\\\\s+");
for (String cell : split)
collector.collect(cell);
).map((MapFunction<String, Tuple2<String, Integer>>) s -> Tuple2.of(s, 1)).groupBy(0).sum(1);
// Sink写数据
result.print();
// 执行
//env.execute("老版本print需要这一句");
执行后报错:
Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(flinkWordCountDemo1.java:32)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
at com.zhiyong.flinkStudy.flinkWordCountDemo1.main(flinkWordCountDemo1.java:32)
Process finished with exit code 1
这种情况就是Idea好心办坏事了!!!Spark中可以随意让Idea自动转Lambda表达式【至少目前没遇到什么大问题】,Flink切记,不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!不要随便转Lambda表达式!!!重要的话说三遍。
故障原因
使用了Lambda表达式,就需要框架能够自行推测出需要使用的数据类型。恰巧Flink框架目前的版本还不具备这样的能力,导致报错。
解决方式
方式一:取消Lambda表达式
package com.zhiyong.flinkStudy;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@Slf4j
public class flinkWordCountDemo1
public static void main(String[] args) throws Exception
String inputPath = "E:/study/flink/data/test1";
System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
// 获取Env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Source读数据
DataSource<String> data = env.readTextFile(inputPath);
// Trans运算
AggregateOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, String>()
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
String[] split = s.trim().split("\\\\s+");
for (String cell : split)
collector.collect(cell);
).map(new MapFunction<String, Tuple2<String, Integer>>()
@Override
public Tuple2<String, Integer> map(String s) throws Exception
return Tuple2.of(s, 1);
).groupBy(0).sum(1);
// Sink写数据
result.print();
只需要将Lambda表达式显式写明入参与出参类型即可正常运行:
Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(好,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)
Process finished with exit code 0
方式二:继承类中显式指定泛型
先实现FlatMapFunction:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction<String,String>
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
String[] split = s.trim().split("\\\\s+");
for (String cell : split)
collector.collect(cell);
接着实现MapFunction:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class FlinkWordCountDemo2MapFunction implements MapFunction<String,Tuple2<String, Integer>>
@Override
public Tuple2<String, Integer> map(String s) throws Exception
return Tuple2.of(s, 1);
之后在主方法中调用:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
public class FlinkWordCountDemo2
public static void main(String[] args) throws Exception
String inputPath = "E:/study/flink/data/test1";
System.out.println("Flink环境正常,开始对路径 " + inputPath +" 执行批处理wordCount");
// 获取Env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Source读数据
DataSource<String> data = env.readTextFile(inputPath);
AggregateOperator result = data.flatMap(new FlinkWordCountDemo2FlatMapFunction()).map(new FlinkWordCountDemo2MapFunction())
.groupBy(0).sum(1);
result.print();
即可看到正确结果:
Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,2)
(123,3)
(好,2)
(宝宝,1)
(haha,1)
(宝贝,2)
(呵呵,4)
(数码宝贝,1)
(喜欢,2)
(哈哈,1)
Process finished with exit code 0
如果实现类中没有标明泛型,例如偷个懒,把FlatMapFunction的继承类的泛型去掉:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
public class FlinkWordCountDemo2FlatMapFunction implements FlatMapFunction
// @Override
// public void flatMap(String s, Collector<String> collector) throws Exception
// String[] split = s.trim().split("\\\\s+");
// for (String cell : split)
// collector.collect(cell);
//
//
@Override
public void flatMap(Object o, Collector collector) throws Exception
String[] split = o.toString().trim().split("\\\\s+");
for (String cell : split)
collector.collect(cell);
再偷个懒,把MapFunction实现类的泛型去掉:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class FlinkWordCountDemo2MapFunction implements MapFunction
// @Override
// public Tuple2<String, Integer> map(String s) throws Exception
// return Tuple2.of(s, 1);
//
@Override
public Object map(Object o) throws Exception
return Tuple2.of(o.toString(), 1);
FlinkWordCountDemo2不变的情况下,执行依旧是会报相同的错:
Flink环境正常,开始对路径 E:/study/flink/data/test1 执行批处理wordCount
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkWordCountDemo2.java:17)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:181)
at org.apache.flink.api.java.DataSet.map(DataSet.java:220)
at com.zhiyong.flinkStudy.FlinkWordCountDemo2.main(FlinkWordCountDemo2.java:17)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1384)
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1412)
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1369)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:811)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:575)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:271)
... 1 more
Process finished with exit code 1
Flink对Lambda表达式的支持还有待提高,至少当下必须遵循这一规则。方式二的做法好处就是实现了计算逻辑的抽取,简化了主类的篇幅,提高了主类代码的可阅读性,同时也可以实现相同逻辑相同算法的复用。
特别说明
出现问题的版本号:Flink1.14.3。POM如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>study</artifactId>
<groupId>study.zhiyong</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flinkStudy</artifactId>
<packaging>pom</packaging>
<!-- <properties>-->
<!-- <maven.compiler.source>8</maven.compiler.source>-->
<!-- <maven.compiler.target>8</maven.compiler.target>-->
<!-- </properties>-->
<!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
<repositories>
以上是关于解决Flink报错Exception in thread “main“ org.apache.flink.api.common.functions.InvalidTypesException的主要内容,如果未能解决你的问题,请参考以下文章
tomcat启动时,内存溢出,Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thr
Exception in thread main org.apache.ibatis.exceptions.PersistenceException:报错解决
日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....
日常Exception第三十三回:Flink运行jar包报错NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....
Eclipse 运行弹出A Java Exception has occurred.并报错Exception in thread 的解决方案
Java编译报错:Exception in thread “main“ java.util.NoSuchElementException(剖析原因理解及解决方法)