Flink的类型与序列化

Posted jordan95225

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的类型与序列化相关的知识,希望对你有一定的参考价值。

1.序言

Flink在内部以二进制的格式将数据保存,由于普通的Java对象类型和内部二进制格式不一致,需要一套相互转换机制来进行序列化和反序列化。

2.DataStream类型系统

2.1 物理类型

Flink支持的物理类型如下图所示:

技术图片

分为基础类型、数组类型、复合类型、辅助类型、泛型和其他类型,如果用户需要自定义类型的话,需要注册该类型并自己实现序列化和反序列化的方法。

对于没有提供的自定义类型,Flink为了程序正常运行,会交给Kryo进行序列化,缺点是序列化和反序列化效率较低。

2.2 逻辑类型

逻辑类型是物理类型的描述,Flink运行时会根据逻辑类型进行数据的序列化和反序列化。

在Flink中使用TypeInformation来描述逻辑类型,该类是一个抽象类,所有逻辑类型继承该类,分类如下图所示:

技术图片

在序列化过程中,所有逻辑类型都必须实现createSerializer(ExecutionConfig config)方法来创建序列化器。

2.3 类型推断

开发者使用物理类型,Flink运行时使用逻辑类型,所以需要从物理类型转换为逻辑类型,Java使用反射机制获取Function的输入输出。

2.3.1 类型提取时机

在使用DataStream接口的时候,会触发类型的提取,如下面map()方法所示:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
		// 提取类型
		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
				Utils.getCallLocationName(), true);

		return map(mapper, outType);
	}

2.3.2 自动类型推断

Flink会首先进行类型推断,如果用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,来获取泛型的类型信息,如下:

TypeInformation.of(new TypeHint<Tuple2<Integer,String>>(){})

2.4 显式声明

一般情况下,可以通过TypeInformation.of()方法来显式创建一个类型信息的对象,如下:

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

2.5 存在问题

2.5.1 类型提取

由于泛型的类型擦除导致类型提取不能总是有效,所以有时候需要自己手动指定。

2.5.2 使用Flink自带的Kryo

使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而不要使用com.esotericsoftware.kryo.serializers.JavaSerializer,防止和Flink不兼容。

3.SQL类型系统

Flink1.9之后引入了新的SQL类型系统,解决了DataStream在SQL中的兼容性、精度、类型等问题。

Flink SQL中使用DataType中的LogicalType类型系统来描述类型信息,在Flink SQL执行时,最终还是要转换为TypeInformation。

Row表示表中的一行数据或者一条记录,在1.9版本之前,Flink SQL使用org.apache.flink.types.Row,在1.9版本之后,使用org.apache.flink.table.dataformat.BaseRow及其子类,下面主要介绍Blink Row。

Blink Row分为列式存储和行式存储,结构如下所示:

技术图片

区别如下:

  1. BinaryRow:表示二进制形式存储,由定长部分和不定长部分组成,定长部分在一个MemorySegment中
  2. NestedRow:同上,但是定长部分可以在不同MemorySegment中
  3. UpdatableRow:保存所有字段数据,更新数据的时候使用一个数组记录被修改字段的最新值,读取数据的时候去判断是否被更新过再读取
  4. ObjectArrayRow:对象存储,多对象的序列化和反序列化过程,成本比二进制高,两个实现类,GenericRow表示原始类型,BoxedWrapperRow表示对象类型
  5. JoinedRow:Join过程中两行数据的逻辑结构
  6. ColumnarRow:二进制列式存储,不同于前面都是行式存储

以上是关于Flink的类型与序列化的主要内容,如果未能解决你的问题,请参考以下文章

Flink Table API & SQL 类型系统 DataType

Flink Table API & SQL 类型系统 DataType

Flink DataStream 类型系统 TypeInformation

flink支持的数据类型讲解(可序列化) 和 内置累加器的运用

Flink 与 Confluent Kafka 模式注册表

flink 计算资源之 原理与实现