Flink即将在1.7版本发布全新的Kafka连接器
Posted Flink
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink即将在1.7版本发布全新的Kafka连接器相关的知识,希望对你有一定的参考价值。
最近我们向Flink社区贡献了一个全新的Kafka连接器(https://github.com/apache/flink/pull/6703),关联JIRA issue FLINK-9697。它包含了近6000行代码变更,经历了社区近100次的review讨论与评论。这一大特性即将会随Flink 1.7一起发布。这一特性也得到了一众Flink PMC的赞赏:
现状与问题
Apache Kafka作为开源界最流行的消息中间件之一,一直以来被广泛应用于很多大数据处理框架中。毫无疑问,Apache Flink作为新一代的大数据计算框架,也非常重视跟Kafka进行生态衔接。Flink所实现并推崇的端到端的“恰好一次”(Exactly-Once)的事件处理语义的典型场景就是配合Kafka来实现的,由此可见Kafka在Flink的上下游生态中占有着非常重要的地位。
从Kafka 0.8版本开始,Flink都以Kafka client的版本为依据,为不同的Kafka client提供特定的connector。在现有的代码库中,有针对Kafka client 0.8, 0.9, 0.10, 0.11这些版本的连接器。如下图所示:
按照社区之前的计划,随着Kafka 1.0, 1.1, 2.0... 版本的发布,Flink kafka connector也会随之迭代下去,以维护不同版本的Kafka client可能出现的issue。
这种模式在最初没有暴露出明显的问题,而且看起来也是合理的。因为虽然Kafka client承诺保证API的向后兼容性。但是:
不同的client版本维护的特性集合不同,比如 Kafka Producer事务自0.11版本才开始提供;
拆分不同版本的connector有利于项目管理,比如issue的分类维护;
出于测试目的以及其他目的,Kafka connector使用了Mini版的Kafka Server、Admin API、非公开的API,这些都跟特定版本有强绑定关系;
所以,看起来似乎没有毛病。
但与此同时,Kafka社区也在快速发展,版本一个接一个地在向前迭代。目前,Flink的这种以Kafka 版本拆分connector的模式很显然已经处于非常被动的地位,并且由于开源社区的开发模式使其追随的速度也大大滞后Kafka的发版速度。可以看到至今为止,仍然没有发布connector-1.0(这个connector原本是打算构建的,也是由Oceanus团队提交的PR,但社区明显发现了这种模式存在着很大的问题,具体的问题我们后面会详谈,这些问题也催生了一个全新的connector的诞生)。
版本与兼容性
很多系统在版本演进的过程中都对外号称保证向后兼容性。兼容性是一个框架是否成熟的考量标准之一,但某种意义上它既是优势也是负担,而将多个系统混在一起,存在多种版本不同的依赖关系又使得用户感到很困扰。就拿 Flink 跟 Kafka 来举例,用户使用Flink 1.6.0版本,但是Kafka Server的版本已经升级到了2.0。但是官方对Flink 1.6.0版本只提供了针对0.11版本的connector:
也许有人会认为,将Flink-connector-kafka-0.11对Kafka client的依赖升级到2.0不就可以了?理想情况下确实应该这样。但是,Flink connector无法让你做到这一点。因为由于一些技术实现的需求,比如Producer端事务的恢复操作,使得Flink在实现connector的时候不得不借助于反射机制来获取client特定类的内部变量来执行相应的逻辑。我们都知道,所谓的兼容性保证只针对公开的API,对于类的内部实现没有哪个框架会做出这种保证。接下来我们会看到,正是因为这一点阻止了我们直接升级Kafka client依赖的方案。退一步说,就算内部实现没有改变,但如果没有通过Flink已有的针对特性构建的集成测试,你也无法确定这种做法在线上是否能正常工作。
新连接器的优势
新连接器做了这样一个决定——只跟踪最新版本的Kafka client并保证它跟Flink的完美衔接。当然由于一些历史原因(Flink原先实现Kafka connector的模式)以及实现问题(少量代码使用反射访问Kafka client的内部变量),已有的Kafka connector会继续保留且不会在新连接器里提供适配与支持。但自Kafka 1.0开始(到目前为止已有三个client版本1.0、1.1、2.0)新连接器都可以支持并能通过Flink全部的集成测试。它所体现出来的优势是:一旦后续Kafka发布新的client版本,这个connector就会开始适配新版本:
升级Kafka client的依赖;
升级connector并提供新功能;
适配非公开API的改动;
适配Admin API的改动;
重构集成测试验证以相关的feature;
以端到端的测试验证升级过client版本的connector访问旧版本的Kafka Server;
而不是再次构建一个针对新版本的Kafka connector。
新连接器的实现
简而言之,新连接器的实现包含量两部分的工作:
重构:部分重构基础连接器以及老版本的连接器(尤其是类名称);
实现:提供一个新连接器的实现;
Flink提供了一个基础connector模块,它是实现所有connector的核心模块,所有的connector都依赖于基础connector。而由于旧connector按Kafka client版本拆分的方式,存在不同的feature在不同的connector中对外开放的问题,所以出现了太多的级联依赖,后实现的connector保有对基础connector以及离它最近版本的connector的依赖,而在测试中甚至出现了跨版本的依赖。这些继承体系中,有的类名中带有“Base”字样,有的类名中带有“08”、“09”等版本号字样。我们对新connector的使用的统一命名约定是:类名中将不会携带任何版本号信息,新connector只依赖基础connector且不再引入对老版本connector的直接或间接依赖。
在对现有的connector进行重构以确保新connector能满足其命名规范后,我们开始实现新connector。引入新connector不仅仅是进行类文件迁移这么简单。我们需要知道,当前Flink源码库中最新的connector所支持的Kafka client版本( 0.11)跟Kafka最新发布的2.0的client的版本的差异(尤其是client内部的变动、admin API的变化)很大。这里面我们列举几个典型的差异:
Flink 进行事务恢复需要用到的sequenceNumbers在新版本内部被重命名为了nextSequence;
之前不少基于Zookeeper获取元数据的API被移除,转而使用Admin API;
Slf4j的依赖版本高于Flink自身对其的依赖版本,导致类加载报错;
所有“kafka.consumer”开头的包导入在2.0都已被废弃(相应的包中的类被删除);
Kafka client一些API修改了默认的超时时间,导致Flink原有的集成测试报错;
......
除此之外,我们还需要破除多版本的级联依赖,重新梳理代码以使得它适配新的connector。
总结
我们为新connector创建了一个独立的umbrella issue(FLINK-10598)用于归档该新connector所产生的各种问题,目前新connector还有不少事情要做,比如文档、端到端的测试等等。 另外,我们在即将发布的1.7版本中也贡献了不少PR,后续有机会再跟大家解读。
以上是关于Flink即将在1.7版本发布全新的Kafka连接器的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.14.0 全新的 Kafka Connector
FlinkFlink 1.14.0 全新的 Kafka Connector