Flink SQL 知其所以然(五) 自定义 protobuf format

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL 知其所以然(五) 自定义 protobuf format相关的知识,希望对你有一定的参考价值。

参考技术A

protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。

issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 见: https://github.com/apache/flink/pull/14376

这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。

如果想在本地直接测试下:

关于为什么选择 protobuf 可以看这篇文章,写的很详细:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。

而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。

预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。

预期 protobuf message 定义如下:

测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:

预期 flink sql:

数据源表 DDL:

数据汇表 DDL:

Transform 执行逻辑:

下面是我在本地跑的结果:

可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。

目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。

这种实现的具体使用方式如下:

其实现有几个特点:

[图片上传失败...(image-66c35b-1644940704671)]

其实上节已经详细描述了 flink sql 对于 source\\sink\\format 的加载机制。

如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的

所有通过 SPI 的 source\\sink\\formt 插件都继承自 Factory 。

整体创建 format 方法的调用链如下图。

最终实现如下,涉及到了几个实现类:

具体流程:

上述实现类的具体关系如下:

介绍完流程,进入具体实现方案细节:

ProtobufFormatFactory 主要创建 format 的逻辑:

resources\\META-INF 文件:

ProtobufRowDataDeserializationSchema 主要实现反序列化的逻辑:

可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:

其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。

ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:

源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。

本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。

当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。

以上是关于Flink SQL 知其所以然(五) 自定义 protobuf format的主要内容,如果未能解决你的问题,请参考以下文章

flink sql 知其所以然| 自定义 redis 数据维表(附源码)

Flink SQL实战演练之自定义Clickhouse Connector

Flink SQL 自定义 redis connector

flink sql 知其所以然| sourcesink 原理

Flink Sql 自定义实现 kudu connector

Flink实战系列Flink SQL 写入 kafka 自定义分区策略