Flink进阶系列--FLIP-27新的Source架构
Posted 打酱油的葫芦娃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink进阶系列--FLIP-27新的Source架构相关的知识,希望对你有一定的参考价值。
Source 旧架构
在 Flink 1.12之前,开发一个新的 source connector 是通过实现 SourceFunction 接口来完成的。
@Public
public interface SourceFunction<T> extends Function, Serializable
// 当 source 开始发送数据时,run 方法被调用,其参数 SourceContext 用于发送数据。run 方法是一个无限循环,通过一个标识 isRunning 来跳出循环结束 source。
void run(SourceContext<T> ctx) throws Exception;
// 放弃发送数据,一般实现逻辑是修改 isRunning 标识
void cancel();
// Source 上下文
interface SourceContext<T>
// 从数据源中发送1条不含时间戳的数据
void collect(T element);
// 从数据源中发送1条含时间戳的数据
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
// 发送水印,以声明不再出现含水印之前时间戳的数据
@PublicEvolving
void emitWatermark(Watermark mark);
// 将源标记为暂时空闲
@PublicEvolving
void markAsTemporarilyIdle();
// 返回检查点锁
// 通过 checkpoint 锁来保证状态更新和数据发送的原子性
Object getCheckpointLock();
// 关闭 Source 上下文
void close();
Source 实现如果需要和 Flink 的 RuntimeContext 交互,则需要实现 RichSourceFunction 抽象类:
// 通过继承 AbstractRichFunction 抽象类对 SourceFunction 进行了增强
@Public
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction
implements SourceFunction<OUT>
private static final long serialVersionUID = 1L;
值得一提的是,Flink 在 SourceFunction 之上抽象出了 InputFormatSourceFunction,开发者只需要实现 InputFormat,批模式 source connector(如 HBase)通常基于 InputFormat 实现,当然 InputFormat 也可以用于流模式,在一定程度上体现了批流融合的思想,但整体上来看至少在接口层面上流批并没有完全一致。
SourceFunction-->ParallelSourceFunction-->RichParallelSourceFunction-->InputFormatSourceFunction
在基于 SourceFunction 的开发模式下,以 Kafka Source 为例,见下图,FlinkKafkaConsumer 为 SourceFunction 的实现类,该类中集中了 kafka partition 发现逻辑(KafkaPartitionDiscoverer)、数据读取逻辑(KafkaFetcher)、基于阻塞队列实现的生产者消费者模型(KafkaConsumerThread -> Handover -> SourceContext)等等。
可以发现,这种开发模式存在如下不足:
- 对于批模式和流模式需要不同的处理逻辑,不符合批流融合的业界趋势。
- 数据分片(例如 kafka partition、file source 的文件 split)和实际数据读取逻辑混合在 SourceFunction 中,导致复杂的实现。
- 数据分片在接口中并不明确,这使得很难以独立于 source 的方式实现某些功能,例如事件时间对齐(event-time alignment)、分区 watermarks(per-partition watermarks)、动态数据分片分配、工作窃取(work stealing)。
- 没有更好的方式来优化 Checkpoint 锁,在锁争用下,一些线程(例如 checkpoint 线程)可能无法获得锁。
- 没有通用的构建模式,每个源都需要实现自行实现复杂的线程模型,这使得开发和测试一个新的 source 变得困难,也提高了开发者对现有 source 的作出贡献的门槛。
有鉴于此,Flink 社区提出了 FLIP-27 的改进计划,并在 Flink 1.12 实现了基础框架,在 Flink 1.13 中 kafka、hive 和 file source 已移植到新架构,开源社区的 Flink CDC connector 2.0 也基于新架构实现。
Source 新架构
特性
Source 新架构主要有如下特性:
数据分片和数据读取分离
新 Source 有2个关键组件:
- SplitEnumerator: 负责将数据拆解成多个分片 (如files, partitions等)。
- Reader: 负责从各个分片中读取数据。
SplitEnumerator 类似于旧的批处理 Source 接口的创建分片和分配分片的功能。 它只运行一次,而不是并行运行(社区在规划未来如有需要,将分片的切分也并行化)。
它可以在 JobManager 上运行,也可以在 TaskManager 上的单个任务中运行,当前社区的实现为每个 SplitEnumerator 都将封装在一个 SourceCoordinator 中。 如果有多个源,就会有多个SourceCoordinator。 SourceCoordinators 将在 JobMaster 中运行,但不作为 ExecutionGraph 的一部分。(目前社区对该问题还没有定论,倾向于将 SplitEnumerator 放在 TaskManager 上运行)。
基于 SplitEnumerator 和 Reader 将 split 分配行为和数据读取行为隔离,有助于用户将不同的分区行为和数据读取行为灵活组合起来,避免将两部分的代码耦合在一起,难以维护。
流批统一
基于新架构开发的 Source 既可以工作于批模式也可以工作于流模式,批仅仅是有界的流。大多数情况下,只有 SplitEnumerator 需要感知数据源是否有界。例如对于 FileSource,批模式下 SplitEnumerator 只需要一次性的列出目录下的所有文件,流模式下则需要周期性的列出所有文件,并为新增的文件生成数据分片。对于 KafkaSource,批模式下 SplitEnumerator 列出处有的 partition,并把每个 partition 的当前最新的数据偏移作为数据分片的结束点,流模式下 SplitEnumerator 则把无穷大作为 partition 数据分片的结束点,即会持续的读取每个 partition 的新增数据,流模式下还可以周期性的监测 partition 的变化并为新增的 partitition 生成数据分片。
双向通信
SplitEnumerator 和 SourceReader 之间可以双向通信,SourceReader 可以主动向 SplitEnumerator 请求数据分片实现 pull 模式的数据分片分配(例如 FileSource),SplitEnumerator 也可以把数据分片直接分配给 SourceReader 实现 push 模式的分配(例如 KafkaSource)。此外,根据需要还可以定制化一些消息实现 SplitEnumerator 和 SourceReader 之间的交互需求。基于双向通信的能力,比较容易实现事件时间对齐(event-time alignment)的功能,实现数据分片之间事件时间的均衡推进。
通用线程模型
考虑到外部数据源系统的客户端 API 调用方式的差异(阻塞、非阻塞、异步),SourceReader 在设计上支持单分片串行读取、多分片多路复用、多分片多线程三种模式。
3种模型的典型示例:
- Sequential Single Split (File, database query, most bounded splits)
- Multi-split multiplexed (Kafka, Pulsar, Pravega, …)
- Multi-split multi-threaded (Kinesis, …)
Flink 1.13 内核的 SingleThreadMultiplexSourceReaderBase/SingleThreadFetcherManager 抽象出的框架支持前两种线程模型,开发者基于此开发 source connector 变得容易。例如 FileSource 采用了单分片串行读取模式,在一个数据分片读取后,再向 SplitEnumerator 请求新的数据分片。KafkaSource 采用了多分片多路复用模式,SplitEnumerator 把启动时读取的 partition 列表和定期监测时发现的新的 partition 列表批量分配给 SourceReader,SourceReader 使用 KafkaConsumer API 读取所有分配到的 partition 的数据。
容错
SplitEnumerator 和 SourceReader 通过 Flink 的分布式快照机制持久化状态,发生异常时从状态恢复。通常 SplitEnumerator 状态保存了未分配的数据分片,SourceReader 状态保存了分配的数据分片以及分片读取状态(例如 kafka offset,文件 offset)。例如流模式下 FileSource 的 SplitEnumerator 状态保存了未分配的分片以及处理过的文件列表,并定期监测文件列表的变化,为新增文件生成数据分片;SourceReader 状态保存了当前读取的分片信息和文件读取 offset。
基本实现
流程
当 SplitEnumerator 将新的 split 添加到 SourceReader 时,在将 split 分配给 SplitReader 之前,该新 split 的初始状态将放入由 SourceReaderBase 维护的状态映射中。
Record 通过 RecordsBySplitIds 集合的方式从 SplitReaders 传递到 RecordEmitter。这允许 SplitReader 以批处理方式将 Record 排入队列,从而提高性能。
大多数 SourceReader 需要实现类似于下面的接口:
public interface SplitReader<E, SplitT extends SourceSplit>
RecordsWithSplitIds<E> fetch() throws InterruptedException;
void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);
void wakeUp();
上述接口中除了 wakeUp() 方法外,Flink 线程模型均保证由同一线程执行,从而进行接口实现的时候,无需考虑并发安全性问题,降低了开发的难度,
SplitReader 获取到的数据集合 RecordsWithSplitIds 将依次转递给 RecordEmitter,RecordEmitter 主要完成下述任务:
- 将原始记录类型 转换为最终记录类型 ;
- 为其处理的数据提供 event time 时间戳。
故障转移
SplitEnumerator 的状态包括以下内容:
- 未分配的 splits;
- 已分配但还未成功 checkpoint 的 splits。
SourceReader 的状态包括:
- 已分配的 splits;
- 各个 split 的状态(例如 Kafka 偏移量、HDFS 文件偏移量等)。
在该 FLIP 中,当 SplitEnumerator 失败时,将执行完整的故障转移。虽然可以进行更细粒度的故障转移以仅恢复 SplitEnumerator 的状态,但社区希望在单独的 FLIP 中解决此问题。
当 SourceReader 失败时,失败的 SourceReader 将恢复到其上一个成功的 checkpoint。 SplitEnumerator 将通过将已分配但未 checkpoint 的 split 重新添加回 SplitEnumerator 来部分重置其状态。 在这种情况下,只有失败的子任务及其连接的节点必须重置状态。
参考文献
https://blog.csdn.net/cloudbigdata/article/details/122406155
Flink - FLIP
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
FLIP-1 : Fine Grained Recovery from Task Failures
When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.
如果一个task失败,当前需要完全停掉整个job恢复,这个明显太重了;
proposal
简单的方案,如果一个task失败,就把和它相连的整条pipeline都重启,但如果所有node都和该task相连,那还是要重启整个job
但这个方案太naive了,是否可以尽量的减少重启的范围?
如果要只重启fail的task,以及后续的tasks,而不想重启源,只有cache
每个node,把要发出去的Intermediate Result缓存下来,当一个node的task挂了后, 只需要从上一层node把Intermediate Result从发出来,就可以避免从source重启
至于如何cache Intermediate Result,在memory还是disk,还是其他,只是方案不同
Caching Intermediate Result
This type of data stream caches all elements since the latest checkpoint, possibly spilling them to disk, if the data exceeds the memory capacity.
When a downstream operator restarts from that checkpoint, it can simply re-read that data stream without requiring the producing operator to restart. Applicable to both batch (bounded) and streaming (unbounded) operations. When no checkpoints are used (batch), it needs to cache all data.
Memory-only caching Intermediate Result
Similar to the caching intermediate result, but discards sent data once the memory buffering capacity is exceeded. Acts as a “best effort” helper for recovery, which will bound recovery when checkpoints are frequent enough to hold data in between checkpoints in memory. On the other hand, it comes absolutely for free, it simply used memory that would otherwise not be used anyways.
Blocking Intermediate Result
This is applicable only to bounded intermediate results (batch jobs). It means that the consuming operator starts only after the entire bounded result has been produced. This bounds the cancellations/restarts downstream in batch jobs.
FLIP-2 Extending Window Function Metadata
Right now, in Flink a WindowFunction does not get a lot of information when a window fires.
The signature of WindowFunction is this:
public
interface
WindowFunction<IN, OUT, KEY, W
extends
Window>
extends
Function, Serializable {
void
apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out);
}
i.e , the user code only has access to the key for which the window fired, the window for which we fired and the data of the window itself. In the future, we might like to extend the information available to the user function. We initially propose this as additional information:
-
Why/when did the window fire. Did it fire on time, i.e. when the watermark passed the end of the window. Did it fire early because of a speculative early trigger or did it fire on late-arriving data.
- How many times did we fire before for the current window. This would probably be an increasing index, such that each firing for a window can be uniquely identified.
当前在window functions中暴露出来的信息不够,需要给出更多的信息,比如why,when fire等
FLIP-3 - Organization of Documentation
FLIP-4 : Enhance Window Evictor
Right now, the ability of Window Evictor is limited
- The Evictor is called only before the WindowFunction. (There can be use cases where the elements have to be evicted after the WindowFunction is applied)
- Elements are evicted only from the beginning of the Window. (There can be cases where we need to allow eviction of elements from anywhere within in the Window as per the eviction logic that user wish to implement)
当前Evictor只是在WindowFunction 之前被执行,是否可以在WindowFunction 之后被执行?
当前的接口只是从beginning of the Window开始,是否可以从任意位置开始evict
FLIP-5: Only send data to each taskmanager once for broadcasts
Problem:
We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per task manager.
降低在广播时,发送的冗余数据
当前状况是,
要达到的效果是,
每个taskmanager只发送一次
FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.
核心想法是,把jobmanager的工作分离出来
增加两个新的模块, ResourceManager和dispatcher
The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:
-
YARN
-
Mesos
-
Standalone-multi-job (Standalone mode)
-
Self-contained-single-job (Docker/Kubernetes)
显然对于不同的资源管理平台,只需要实现不同的ResourceManager
于是JobManager, TaskManager和ResourceManager之间的关系就变成这样
TaskManager,向ResourceManager进行注册,并定期汇报solts的情况
JobManager会向ResourceManager请求slot,然后ResourceManager会选择TaskManager,告诉它向某JobManager提供slots
然后该TaskManager会直接联系JobManager去提供slots
同时JobManager会有slot pool,来保持申请到的slots
The SlotPool is a modification of what is currently the InstanceManager.
这样就算ResourceManager挂掉了,JobManager仍然可以继续使用已经申请的slots
The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.
In the future run, the dispatcher will also help with the following aspects:
-
The dispatcher is a cross-job service that can run a long-lived web dashboard
-
Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters
-
The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications
把Dispatcher从JobManager中分离出来的好处,
首先dispatcher是可以跨cluster的,是个long-lived web dashboard,比如后面如果一个cluster或jobmanager挂了,我可以简单的spawn到另外一个
第二,client到dispatcher是基于http的很容易穿过防火墙
第三,dispatcher可以当作类似proxy的作用,比如authentications
所以对于不同的cluster manager的具体架构如下,
Yarn,
Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:
-
The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted
-
All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader
-
Containers are requested as needed and will be released when not used any more
-
The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators
这个架构把整个Flink都托管在Yarn内部
好处,
你不需要先拉起flink集群,然后再提交job,只需要直接提交job;Yarn的ResourcManager会先拉起Application Master,其中包含Resource Manager和Job Manager;然后当Flink resource manager需要资源时,会先和YARN ResourceManager请求,它会去创建container,其中包含TaskManager;
Mesos,
这个架构和Yarn类似,
Mesos-specific Fault Tolerance Aspects
ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon
Standalone,
The Standalone Setup is should keep compatibility with current Standalone Setups.
The role of the long running JobManager is now a “local dispatcher” process that spawns JobManagers with Jobs internally. The ResourceManager lives across jobs and handles TaskManager registration.
For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.
Component Design and Details
更具体的步骤,
FLIP-7: Expose metrics to WebInterface
With the introduction of the metric system it is now time to make it easily accessible to users. As the WebInterface is the first stop for users for any details about Flink, it seems appropriate to expose the gathered metrics there as well.
The changes can be roughly broken down into 4 steps:
- Create a data-structure on the Job-/TaskManager containing a metrics snapshot
- Transfer this snapshot to the WebInterface back-end
- Store the snapshot in the WebRuntimeMonitor in an easily accessible way
- Expose the stored metrics to the WebInterface via REST API
FLIP-8: Rescalable Non-Partitioned State
要解决的问题是,当dynamic scaling的时候,如何解决状态的问题
如果没有状态,动态的scaling,需要做的只是把流量分到新的operator的并发上
但是对于状态,当增加并发的时候,需要把状态切分,而减少并发的时候,需要把状态合并
这个就比较麻烦了
同时在Flink里面,状态分为3部分,operator state, the function state and key-value states
其中对于key-value states的方案相对简单一些,https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#
这里基本的思想,就是更细粒度的checkpoint;
原先是以task级别为粒度,这样加载的时候,只能加载一个task,如果一个task扩成2个,task级别的checkpoint也需要切分
而采用更细粒度的checkpoint独立存储,而不依赖task,这样就可以独立于task进行调度
比如对于key-value,创建一个叫key groups的概念,以key group作为一个checkpoint的单元
In order to efficiently distribute key-value states across the cluster, they should be grouped into key groups. Each key group represents a subset of the key space and is checkpointed as an independent unit. The key groups can then be re-assigned to different tasks if the DOP changes.
这样当发生增减operator的并发度的时候,只需要以key group为单位调度到新的operator上,同时在该operator上恢复相应的checkpoint即可,如图
然后,对于non-partitioned operator and function state,这个问题怎么解
比如对于kafkasource,4个partitions,2个source的并发
scaling down后,就会出现下图,左边的情况,因为只有s1 task了,他只会load他自己的checkpoint,而之前s2的checkpoint就没人管了
而理论上,我们是要达到右边的情况的
scaling up后,也会出现下图左边的case,因为S1,S2加载了原来的checkpoint,但是当前其实partition3,partition4已经不再分配到s2了
思路还是一样,把checkpoint的粒度变细,而不依赖于task,
FLIP-9: Trigger DSL
当前支持的trigger方式不够灵活,而且对late element只能drop,需要设计更为灵活和合理的DSL,用于描述Trigger policy
FLIP-10: Unify Checkpoints and Savepoints
Currently checkpoints and savepoints are handled in slightly different ways with respect to storing and restoring them. The main differences are that savepoints 1) are manually triggered, 2) persist checkpoint meta data, and 3) are not automatically discarded.
With this FLIP, I propose to allow to unify checkpoints and savepoints by allowing savepoints to be triggered automatically.
FLIP-11: Table API Stream Aggregations
The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:
-
Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.
-
Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.
Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.
FLIP-12: Asynchronous I/O Design and Implementation
I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.
流最终会碰到外部存储,那就会有IO瓶颈,比如写数据库,那么会阻塞整个流
这个问题怎么解?可以用多线程,这个会让编程模型比较复杂,说白了,不够优雅
所以解决方法其实就是用reactor模型,典型的解决I/O等待的方法
AsyncFunction: Async I/O will be triggered in AsyncFunction.
AsyncWaitOperator: An StreamOperator which will invoke AsyncFunction.
AsyncCollector: For each input streaming record, an AsyncCollector will be created and passed into user\'s callback to get the async i/o result.
AsyncCollectorBuffer: A buffer to keep all AsyncCollectors.
Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors.
对于普通的operator,调用function,然后把数据用collector发送出去
但对于AsyncWaitOperator,无法直接得到结果,所以把AsyncCollector传入callback,当function触发callback的时候,再emit数据
但这样有个问题,emit的顺序是取决于,执行速度,如果对emit顺序没有要求应该可以
但如果模拟同步的行为,理论上,emit的顺序应该等同于收到的顺序
这样就需要一个buffer,去cache所有的AsyncCollector,即AsyncCollectorBuffer
当callback被执行时,某个AsyncCollector会被填充上数据,这个时候被mark成可发送
但是否发送,需要依赖一个外部的emitter
他会check,并决定是否真正的emit这个AsyncCollector,比如check是否它之前的所有的都已经emit,否则需要等待
这里还需要考虑的是, watermark,它必须要等到前面的数据都已经被成功emit后,才能被emit;这样才能保证一致性
以上是关于Flink进阶系列--FLIP-27新的Source架构的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.14.0 消费 kafka 数据自定义反序列化类
Flink 实战系列Flink 同步 Kafka 数据到 HDFS parquet 格式存储 snappy 压缩