Flink 1.0到1.9特性

Posted zclaude

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 1.0到1.9特性相关的知识,希望对你有一定的参考价值。

Flink API 历史变迁

技术图片

在 Flink 1.0.0 时期,加入了 State API,即 ValueState、ReducingState、ListState 等等。State API 可以认为是 Flink 里程碑式的创新,它能够让用户像使用 Java 集合一样地使用 Flink State,却能够自动享受到状态的一致性保证,不会因为故障而丢失状态。包括后来 Apache Beam 的 State API 也从中借鉴了很多。

在 Flink 1.1.0 时期,支持了 Session Window 并且能够正确的处理乱序的迟到数据,使得最终结果是正确的

在 Flink 1.2.0 时期,提供了 ProcessFunction,这是一个 Lower-level 的API,用于实现更高级更复杂的功能。它除了能够注册各种类型的 State 外,还支持注册定时器(支持 EventTime 和 ProcessingTime),常用于开发一些基于事件、基于时间的应用程序。

在 Flink 1.3.0 时期,提供了 Side Output 功能。算子的输出一般只有一种输出类型,但是有些时候可能需要输出另外的类型,比如除了输出主流外,还希望把一些异常数据、迟到数据以侧边流的形式进行输出,并分别交给下游不同节点进行处理。简而言之,Side Output 支持了多路输出的功能。

在 Flink 1.5.0 时期,加入了BroadcastState。BroadcastState是对 State API 的一个扩展。它用来存储上游被广播过来的数据,这个 operator 的每个并发上存的BroadcastState里面的数据都是一模一样的,因为它是从上游广播而来的。基于这种State可以比较好地去解决 CEP 中的动态规则的功能,以及 SQL 中不等值Join的场景。

在 Flink 1.6.0 时期,提供了State TTL功能、DataStream Interval Join功能。State
TTL实现了在申请某个State时候可以在指定一个生命周期参数(TTL),指定该state
过了多久之后需要被系统自动清除。在这个版本之前,如果用户想要实现这种状态清理操作需要使用ProcessFunction注册一个Timer,然后利用Timer的回调手动把这个State
清除。从该版本开始,Flink框架可以基于TTL原生地解决这件事情。DataStream Interval Join 使得
区间Join成为可能。例如左流的每一条数据去Join右流前后5分钟之内的数据,这种就是5分钟的区间Join。

Flink High-Level API 历史变迁

技术图片

在 Flink 1.0.0 时期,Table API (结构化数据处理API)和 CEP(复杂事件处理API)这两个框架被首次加入到仓库中。Table API 是一种结构化的高级
API,支持 Java 语言和 Scala 语言,类似于 Spark 的 DataFrame API。Table API 和 SQL非常相近,他们都是一种处理结构化数据的语言,实现上可以共用很多内容。所以在 Flink 1.1.0 里面,社区基于Apache Calcite对整个 Table 模块做了重构,使得同时支持了 Table API 和 SQL 并共用了大部分代码。

在 Flink 1.2.0 时期,社区在Table API和SQL上支持丰富的内置窗口操作,包括Tumbling Window、Sliding Window、Session Window。

在 Flink 1.3.0 时期,社区首次提出了Dynamic Table这个概念,借助Dynamic
Table,流和批之间可以相互进行转换。流可以是一张表,表也可以是一张流,这是流批统一的基础之一。其中Retraction机制是实现Dynamic
Table的基础之一,基于Retraction才能够正确地实现多级Aggregate、多级Join,才能够保证流式 SQL 的语义与结果的正确性。另外,在该版本中还支持了 CEP
算子的可伸缩容(即改变并发)。

在 Flink 1.5.0 时期,在 Table API 和 SQL 上支持了Join操作,包括无限流的 Join 和带窗口的 Join。还添加了 SQL CLI 支持。SQL CLI 提供了一个类似Shell命令的对话框,可以交互式执行查询。

Flink Checkpoint & Recovery 历史变迁

技术图片
Checkpoint机制在Flink很早期的时候就已经支持,是Flink一个很核心的功能,Flink 社区也一直努力提升 Checkpoint 和 Recovery 的效率。

在 Flink 1.0.0 时期,提供了 RocksDB 状态后端的支持,在这个版本之前所有的状态数据只能存在进程的内存里面,JVM 内存是固定大小的,随着数据越来越多总会发生 FullGC 和 OOM 的问题,所以在生产环境中很难应用起来。如果想要存更多数据、更大的State就要用到 RocksDB。RocksDB是一款基于文件的嵌入式数据库,它会把数据存到磁盘,同时又提供高效的读写性能。所以使用RocksDB不会发生OOM这种事情。

在 Flink 1.1.0 时期,支持了 RocksDB Snapshot 的异步化。在之前的版本,RocksDB 的 Snapshot 过程是同步的,它会阻塞主数据流的处理,很影响吞吐量。在支持异步化之后,吞吐量得到了极大的提升。

在 Flink 1.2.0 时期,通过引入KeyGroup的机制,支持了 KeyedState 和 OperatorState 的可扩缩容。也就是支持了对带状态的流计算任务改变并发的功能。

在 Flink 1.3.0 时期,支持了 Incremental Checkpoint (增量检查点)机制。Incemental Checkpoint 的支持标志着 Flink 流计算任务正式达到了生产就绪状态。增量检查点是每次只将本次 checkpoint 期间新增的状态快照并持久化存储起来。一般流计算任务,GB 级别的状态,甚至 TB 级别的状态是非常常见的,如果每次都把全量的状态都刷到分布式存储中,这个效率和网络代价是很大的。如果每次只刷新增的数据,效率就会高很多。在这个版本里面还引入了细粒度的recovery的功能,细粒度的recovery在做恢复的时候,只需要恢复失败节点的联通子图,不用对整个 Job 进行恢复,这样便能够提高恢复效率。

在 Flink 1.5.0 时期,引入了本地状态恢复的机制。因为基于checkpoint机制,会把State持久化地存储到某个分布式存储,比如HDFS,当发生 failover 的时候需要重新把数据从远程HDFS再下载下来,如果这个状态特别大那么下载耗时就会较长,failover 恢复所花的时间也会拉长。本地状态恢复机制会提前将状态文件在本地也备份一份,当Job发生failover之后,恢复时可以在本地直接恢复,不需从远程HDFS重新下载状态文件,从而提升了恢复的效率。

Flink Runtime 历史变迁

技术图片

在 Flink 1.2.0 时期,提供了Async I/O功能。Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。例如,为了关联某些字段需要查询外部 HBase 表,同步的方式是每次查询的操作都是阻塞的,数据流会被频繁的I/O请求卡住。当使用异步I/O之后就可以同时地发起N个异步查询的请求,不会阻塞主数据流,这样便提升了整个job的吞吐量,提升CPU利用率。

在 Flink 1.3.0 时期,引入了HistoryServer的模块。HistoryServer主要功能是当job结束以后,会把job的状态以及信息都进行归档,方便后续开发人员做一些深入排查。

在 Flink 1.4.0 时期,提供了端到端的 exactly-once 的语义保证。Exactly-once 是指每条输入的数据只会作用在最终结果上有且只有一次,即使发生软件或硬件的故障,不会有丢数据或者重复计算发生。而在该版本之前,exactly-once 保证的范围只是 Flink 应用本身,并不包括输出给外部系统的部分。在 failover 时,这就有可能写了重复的数据到外部系统,所以一般会使用幂等的外部系统来解决这个问题。在 Flink 1.4 的版本中,Flink 基于两阶段提交协议,实现了端到端的 exactly-once 语义保证。内置支持了 Kafka 的端到端保证,并提供了 TwoPhaseCommitSinkFunction 供用于实现自定义外部存储的端到端 exactly-once 保证。

在 Flink 1.5.0 时期,Flink 发布了新的部署模型和处理模型(FLIP6)。新部署模型的开发工作已经持续了很久,该模型的实现对Flink核心代码改动特别大,可以说是自 Flink 项目创建以来,Runtime 改动最大的一次。简而言之,新的模型可以在YARN, MESOS调度系统上更好地动态分配资源、动态释放资源,并实现更高的资源利用率,还有提供更好的作业之间的隔离。

除了 FLIP6 的改进,在该版本中,还对网站栈做了重构。重构的原因是在老版本中,上下游多个 task 之间的通信会共享同一个 TCP connection,导致某一个 task 发生反压时,所有共享该连接的 task 都会被阻塞,反压的粒度是 TCP connection 级别的。为了改进反压机制,Flink应用了在解决网络拥塞时一种经典的流控方法——基于Credit的流量控制。使得流控的粒度精细到具体某个 task 级别,有效缓解了反压对吞吐量的影响。

 

2.1 Flink 1.7.0 新特性

在 Flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 Flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 Scala 2.12 的支持、一次性 S3 文件接收器、复杂事件处理与流 SQL 的集成等。

Apache Flink 中对 Scala 2.12 的支持(FLINK-7811)

Apache Flink 1.7.0 是第一个完全支持 Scala 2.12 的版本。这允许用户使用较新的 Scala 版本编写 Flink 应用程序并利用 Scala 2.12 生态系统。

状态演进(FLINK-9376)

许多情况下,由于需求的变化,长期运行的 Flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 Flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 Avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 Avro 的规范来演变国家的架构。虽然 Avro 类型是 Flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 Flink 版本中进一步扩展对其他类型的支持。

MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

这是 Apache Flink 1.7.0 的一个重要补充,它为 Flink SQL 提供了 MATCH RECOGNIZE 标准的初始支持。此功能结合了复杂事件处理(CEP)和 SQL,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。

流式 SQL 中的时态表和时间连接(FLINK-9712)

时态表是 Apache Flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。

Streaming SQL 的其他功能

除了上面提到的主要功能外,Flink 的 Table&SQL API 已经扩展到更多用例。以下内置函数被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 现在支持在环境文件和 CLI 会话中定义视图。此外,CLI 中添加了基本的 SQL 语句自动完成功能。社区添加了一个 Elasticsearch 6 表接收器,允许存储动态表的更新结果。

Kafka 2.0 连接器(FLINK-10598)

Apache Flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 Kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 Kafka 2.0。

本地恢复(FLINK-9635)

Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,Flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,Flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。

2.2 Flink 1.8.0 新特性

Flink 1.8.0 引入对状态的清理

使用 TTL(生存时间)连续增量清除旧的 Key 状态 Flink 1.8 引入了对 RocksDB 状态后端(FLINK-10471)和堆状态后端(FLINK-10473)的旧数据的连续清理。这意味着旧的数据将(根据 TTL 设置)不断被清理掉。

新增和删除一些 Table API

1) 引入新的 CSV 格式符(FLINK-9964)

此版本为符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能与 Kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系统连接器。

2) 静态生成器方法在 TableEnvironment(FLINK-11445)上的弃用

为了将 API 与实际实现分开,TableEnvironment.getTableEnvironment() 不推荐使用静态方法。现在推荐使用 Batch/StreamTableEnvironment.create()。

3) 表 API Maven 模块中的更改(FLINK-11064)

之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 Java 还是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

Kafka Connector 的修改

引入可直接访问 ConsumerRecord 的新 KafkaDeserializationSchema(FLINK-8354),对于 FlinkKafkaConsumers 推出了一个新的 KafkaDeserializationSchema,可以直接访问 KafkaConsumerRecord。

 

一、Flink 1.9.0的里程碑意义

下图展示的是在2019年中阿里技术微信公众号发表的两篇新闻,一篇为“阿里正式向Apache Flink贡献Blink代码”介绍的是在2019年1月Blink开源并且贡献给Apache Flink,另外一篇为“修改代码150万行!Apache Flink 1.9.0做了这些重大修改!”介绍的是2019年8月Bink合并入Flink之后首次发版。之所以将这两篇新闻放在一起,是因为无论是对于Blink还是Flink而言,Flink 1.9.0的发版都是具有里程碑意义的。

技术图片

在2019年年初,Blink开源贡献给Apache Flink的时候,一个要点就是Blink会以Flink的一个分支来支持开源,Blink会将其主要的优化点都Merge到Flink里面,一起将Flink做的更好。如今,都已经过去了半年的时间,随着Flink1.9.0版本的发布,阿里巴巴的Blink团队可以骄傲地宣布自己已经兑现了之前的承诺。因此,当我们结合这两篇报道来看的时候,能够发现当初Blink的一些新功能如今已经能够在Flink1.9.0版本里面看到了,也能看出Flink社区的效率和执行力都是非常高的。

二、Flink 1.9.0的重点改动和新功能

这部分将为大家介绍Flink 1.9.0的重点改动和新功能。

架构升级
整体而言,如果一个软件系统产生了较大改动,那基本上就是架构升级带来的,对于Flink而言也不例外。想必熟悉Flink的同学对于下图中左侧的架构图一定不会陌生,在Flink的分布式流式执行引擎之上有一整套相对独立的DataStream API和DataSet API,它们分别负责流计算作业和批处理作业。在此基础之上Flink还提供了一个流批统一的Table API和SQL,用户可以使用相同的Table API或者SQL来描述流计算作业和批处理作业,只需要在运行时告诉Flink引擎以流模式运行还是以批模式运行即可,Table层将会把作业优化成为DataStream作业或者DataSet作业。但是Flink 1.8版本的架构在底层存在一些弊端,那就是DataStream和DataSet在底层共享的代码并不多。其次,两者的API也完全不同,因此就会导致上层重复开发的工作量比较大,长期来看就会使得Flink的开发和维护成本越来越大。

技术图片

基于上述问题,Blink在架构上进行了一些新型的探索,经过和社区密切的讨论之后确定了Flink未来的架构路线。也就是在Flink未来的版本中,DataSet的API会被完全移除掉,SteamTransformation会作为底层的API来描述批作业和流作业,Table API和SQL会将流作业都翻译到SteamTransformation上,所以在Flink 1.9中为了不影响使用之前版本用户的体验,还需要一种能够让新旧架构并存的方案。基于这个目的,Flink的社区开发人员也做了一系列努力,提出了上图中右侧的Flink 1.9架构设计,将API和实现部分做了模块的拆分,并且提出了一个Planner接口,能够支持不同的Planner具体实现。Planner的具体工作就是优化和翻译成物理执行图等,也就是Flink Query Processor所做的工作。Flink将原本的实现全部移动到了Flink Query Processor中,将从Blink Merge过来的功能都放到了Blink Query Processor。这样就能够实现一举两得,不仅能够使得Table模块拆分之后变得更加清晰,更重要的是也不会影响老版本用户的体验,同时能够使得用户享受到Blink的新功能和优化。

Table API & SQL 重构和新功能

在Table API & SQL 重构和新功能部分,Flink在1.9.0版本中也Merge了大量从Blink中增加的SQL功能。这些新功能都是在阿里巴巴内部经过千锤百炼而沉淀出来的,相信能够使得Flink更上一层台阶。这里挑选了一些比较重要的成果为大家介绍,比如对于SQL DDL的支持,重构了类型系统,高效流式的TopN,高效流式去重,社区关注已久的维表关联,对于MinBatch以及多种解热点手段的支持,完整的批处理支持,Python Table API以及Hive的集成。接下来也会简单介绍下这些新功能。

技术图片

SQL DDL:在以前如果要注册一个Source或者Table Sink,必须要通过Java、Scala等代码或者配置文件进行注册,而在Flink 1.9版本中则支持了SQL DDL的语法直接去注册或者删除表。

重构类型系统:在Flink 1.9版本中实现了一套全新的数据类型系统,这套全新的类型系统与SQL标准进行了完全对齐,能够支持更加丰富的类型。这套全新的类型系统也为未来Flink SQL能够支持更加完备和完善的功能打下了坚实的基础。

TopN:在Flink 1.9版本提供强大的流处理能力以及社区期待已久的TopN来实时计算排行榜,能够实时计算排名靠前的店铺或者进行实时流数据的过滤。

高效流式去重:在现实的生产系统中,很多ETL作业或者任务没有做到端到端的一致性,这就导致明细层可能存在重复数据,这些数据交给汇总层做汇总时就会造成指标偏大,进而多计算了一些值,因此在进入汇总层之前往往都会做一个去重,这里引入了一个流计算中比较高效的去重功能,能够以比较低的代价来过滤重复的数据。

维表关联:能够实时地关联mysql、HBase、Hive表中数据。

MinBatch&多种解热点手段:在性能优化方面,Flink 1.9版本也提供了一些性能优化的手段,比如提升吞吐的MinBatch的优化以及多种解热点手段。

完整的批处理支持:Flink 1.9版本具有完整的批处理支持,在下一个版本中也会继续投入力量来支持TBDS达到开箱即用的高性能。

Python Table API:在Flink 1.9版本中也引入了Python Table API,这也是Flink在多语言方向的有一个重大进步。能够使得Python用户能够轻松地玩转Flink SQL这样的功能。

Hive集成:Hive是Hadoop生态圈中不可忽视的重要力量,为了更好地去推广Flink批处理的功能,与Hive进行集成也是必不可少的。很高兴,在Flink 1.9版本的贡献者中也有两位Hive的PMC来推动集成工作。而首先需要解决的就是Flink如何读取Hive数据的问题,目前Flink已经完整打通了对于Hive MetaStore的访问,Flink可以直接去访问Hive MetaStore中的数据,同时反过来Flink也可以将其表数据中的元信息直接存储到Hive MetaStore里面供Hive访问,同时我们也增加了Hive的Connector支持CSV等格式,用户只需要配置Hive的MetaStore就能够在Flink直接读取。在此基础之上,Flink 1.9版本还增加了Hive自定义函数的兼容,Hive的自定义函数都能够在Flink SQL里面直接运行。

批处理改进:细粒度批作业恢复(FLIP-1)

Flink 1.9版本在批处理部分也做了较多的改进,首要的就是细粒度批作业的恢复。这个优化点在很早之前就被提出来了,而在1.9版本里终于将未完成的功能实现了收尾。在Flink 1.9版本中,如果批处理的作业有错误发生,Flink会首先计算这个错误影响的范围,这称为Fault Region,因为在批处理作业中有一些节点需要通过Pipeline的数据进行传输,而其他的节点可以通过Blocking的方式先把数据存储下来,下游再去读取存储下来的数据,如果算子的输出已经进行了完整的保存,那就没有必要将这个算子重新拉起来运行了,这样就使得错误恢复被控制在一个相对较小的范围里面。如果再极端一点,在每个数据Shuffle的地方都进行数据落盘,这就和MapReduce的Map行为比较类似了,不过Flink支持更加高级的用法,用户可以自行控制每个Shuffle的地方通过网络进行直连还是通过文件落盘的方式进行传输,这也是Flink的一个核心不同点。

技术图片

有了文件Shuffle之后,大家也会想是否能够将这个功能插件化,使其能够将文件Shuffle到其他地方,目前社区也在针对于这个方向做相应的努力,比如可以用Yarn做Shuffle的实现或者做一个分布式服务对于文件进行Shuffle。在阿里内部已经实现了这种架构,实现了单作业处理百TB级别的作业。当Flink具备这种插件化机制以后,就能够轻松地对接更加高效和灵活的Shuffle,让Shuffle这个批处理里面老大难的问题得到较好的解决。

流处理改进:State Processor API(FLIP-43)

流处理一直都是Flink的核心,所以在Flink 1.9版本里面也在流处理方面提出了很多改进,增加了一个非常实用的功能叫做Sate Processor API,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据。

此外,借助State Processor API,用户可以直接分析State中的数据,因为这部分数据在之前一直属于黑盒,这里面存储的数据是对是错,是否存在异常都用都无从得知,当有了State Processor API之后,用户就可以像分析普通数据一样来分析State数据,进而检测异常和分析故障。第三点就是对于脏数据的订正,比如有一条脏数据污染了State,就可以用State Processor API对于状态进行修复和订正。最后一点就是状态迁移,但用户修改了作业逻辑,还想要复用原来作业中大部分的State,或者想要升级这个State的结构就可以用这个API来完成相应的工作。在流处理面很多常见的工作和问题都可以通过Flink 1.9版本里面提供的State Processor API解决,因此也可以看出这个API的应用前景是非常广泛的。

重构的Web UI

除了上述功能的改进之外,Flink 1.9.0还提供了如下图所示的焕然一新的Web UI。这个最新的前端UI由专业Web前端工程师操刀,采用了最新的AngularJS进行重构。可以看出最新的Web UI非常的清新和现代化,也算是Apache开源软件里面自带UI的一股清流。

技术图片

以上是关于Flink 1.0到1.9特性的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 1.9 升级 到 flink 1.12.4 报错 flink.client.cli.AbstractCustomCommandLine <init>

FlinkFlink 1.9 升级 到 flink 1.12.4 报错 shaded netty4 AbstractChannel AnnotatedConnectException

Flink面试通关手册

Flink 1.9 FlinkKafkaProducer 使用 EXACTLY_ONCE 错误记录

Flink面试,看这篇就足够了

FLinkFlink 1.9 升级到 1.12.4 无配置页面 无日志