Flink的类型与序列化
Posted jordan95225
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的类型与序列化相关的知识,希望对你有一定的参考价值。
1.序言
Flink在内部以二进制的格式将数据保存,由于普通的Java对象类型和内部二进制格式不一致,需要一套相互转换机制来进行序列化和反序列化。
2.DataStream类型系统
2.1 物理类型
Flink支持的物理类型如下图所示:
分为基础类型、数组类型、复合类型、辅助类型、泛型和其他类型,如果用户需要自定义类型的话,需要注册该类型并自己实现序列化和反序列化的方法。
对于没有提供的自定义类型,Flink为了程序正常运行,会交给Kryo进行序列化,缺点是序列化和反序列化效率较低。
2.2 逻辑类型
逻辑类型是物理类型的描述,Flink运行时会根据逻辑类型进行数据的序列化和反序列化。
在Flink中使用TypeInformation来描述逻辑类型,该类是一个抽象类,所有逻辑类型继承该类,分类如下图所示:
在序列化过程中,所有逻辑类型都必须实现createSerializer(ExecutionConfig config)方法来创建序列化器。
2.3 类型推断
开发者使用物理类型,Flink运行时使用逻辑类型,所以需要从物理类型转换为逻辑类型,Java使用反射机制获取Function的输入输出。
2.3.1 类型提取时机
在使用DataStream接口的时候,会触发类型的提取,如下面map()方法所示:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 提取类型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return map(mapper, outType);
}
2.3.2 自动类型推断
Flink会首先进行类型推断,如果用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,来获取泛型的类型信息,如下:
TypeInformation.of(new TypeHint<Tuple2<Integer,String>>(){})
2.4 显式声明
一般情况下,可以通过TypeInformation.of()方法来显式创建一个类型信息的对象,如下:
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
2.5 存在问题
2.5.1 类型提取
由于泛型的类型擦除导致类型提取不能总是有效,所以有时候需要自己手动指定。
2.5.2 使用Flink自带的Kryo
使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而不要使用com.esotericsoftware.kryo.serializers.JavaSerializer,防止和Flink不兼容。
3.SQL类型系统
Flink1.9之后引入了新的SQL类型系统,解决了DataStream在SQL中的兼容性、精度、类型等问题。
Flink SQL中使用DataType中的LogicalType类型系统来描述类型信息,在Flink SQL执行时,最终还是要转换为TypeInformation。
Row表示表中的一行数据或者一条记录,在1.9版本之前,Flink SQL使用org.apache.flink.types.Row,在1.9版本之后,使用org.apache.flink.table.dataformat.BaseRow及其子类,下面主要介绍Blink Row。
3.1 Blink Row
Blink Row分为列式存储和行式存储,结构如下所示:
区别如下:
- BinaryRow:表示二进制形式存储,由定长部分和不定长部分组成,定长部分在一个MemorySegment中
- NestedRow:同上,但是定长部分可以在不同MemorySegment中
- UpdatableRow:保存所有字段数据,更新数据的时候使用一个数组记录被修改字段的最新值,读取数据的时候去判断是否被更新过再读取
- ObjectArrayRow:对象存储,多对象的序列化和反序列化过程,成本比二进制高,两个实现类,GenericRow表示原始类型,BoxedWrapperRow表示对象类型
- JoinedRow:Join过程中两行数据的逻辑结构
- ColumnarRow:二进制列式存储,不同于前面都是行式存储
以上是关于Flink的类型与序列化的主要内容,如果未能解决你的问题,请参考以下文章
Flink Table API & SQL 类型系统 DataType
Flink Table API & SQL 类型系统 DataType
Flink DataStream 类型系统 TypeInformation