Flink SQL 知其所以然(五) 自定义 protobuf format
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL 知其所以然(五) 自定义 protobuf format相关的知识,希望对你有一定的参考价值。
参考技术Aprotobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。
issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 见: https://github.com/apache/flink/pull/14376
这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。
如果想在本地直接测试下:
关于为什么选择 protobuf 可以看这篇文章,写的很详细:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。
而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。
预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。
预期 protobuf message 定义如下:
测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:
预期 flink sql:
数据源表 DDL:
数据汇表 DDL:
Transform 执行逻辑:
下面是我在本地跑的结果:
可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。
目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。
这种实现的具体使用方式如下:
其实现有几个特点:
[图片上传失败...(image-66c35b-1644940704671)]
其实上节已经详细描述了 flink sql 对于 source\\sink\\format 的加载机制。
如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的
所有通过 SPI 的 source\\sink\\formt 插件都继承自 Factory 。
整体创建 format 方法的调用链如下图。
最终实现如下,涉及到了几个实现类:
具体流程:
上述实现类的具体关系如下:
介绍完流程,进入具体实现方案细节:
ProtobufFormatFactory 主要创建 format 的逻辑:
resources\\META-INF 文件:
ProtobufRowDataDeserializationSchema 主要实现反序列化的逻辑:
可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:
其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。
ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:
源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。
本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。
当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。
以上是关于Flink SQL 知其所以然(五) 自定义 protobuf format的主要内容,如果未能解决你的问题,请参考以下文章
flink sql 知其所以然| 自定义 redis 数据维表(附源码)
Flink SQL实战演练之自定义Clickhouse Connector
flink sql 知其所以然| sourcesink 原理