Flink Checkpoint 和 Large State 调优
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Checkpoint 和 Large State 调优相关的知识,希望对你有一定的参考价值。
参考技术A 为了使 Flink 应用程序能够可靠地大规模运行,必须满足两个条件:监控 Checkpoint 行为的最简单方法是通过 WebUI 界面。有两个 Checkpoint Metric 最值得关注的是:
理想情况下,这两个值都应该是低值,持续出现较高的值意味着 checkpoint barrier 在 job graph 中缓慢移动,通常是由于 backpressure 存在(没有足够的资源来处理记录)。也可以通过增加处理记录的端到端延迟来观察。
应用程序可以配置固定时间间隔触发 checkpoint。当一个 checkpoint 的完成时间长于固定间隔时,在进行中的 checkpoint 完成之前不会触发下一个(默认情况下,下一个 checkpoint 将在正在进行的 checkpoint 完成后立即触发)。
当 checkpoint 结束的时间经常超过固定间隔时,系统会不断地触发 checkpoint(完成后立即启动新)。这可能意味着在两个 checkpoint 之间,Operator 处理进展过少,并且 checkpoint 占用了过多的资源。此行为对使用异步 checkpoint 的流应用程序的影响较小,但仍可能对整体应用程序性能产生影响。
为了防止这种情况,应用程序可以定义一个 checkpoint 的最小间隔(在最新 checkpoint 结束和下一个 checkpoint 开始前必须经过的最小时间间隔。):
下图说明了这是如何影响 checkpoint 的,避免了 checkpoint 持续不断的进行。
可以配置应用程序允许同时进行多个 checkpoint。当手动触发 savepoint 时,可能与正在进行的 checkpoint 同时进行。
许多大规模 Flink 流计算应用程序的 State 存储使用的是 RocksDB state Backend。扩展性远远超过主内存,并可靠地存储大的 keyed state 。
RocksDB 的性能会因配置而异,下面介绍一些使用 RocksDB state Backend 的最佳实践。
在减少 checkpoint 所需时间方面,开启增量 checkpoint 应该是首要考虑因素之一。与完全 checkpoint 相比,增量 checkpoint 可以显著减少时间,因为只记录与前一次完成的 checkpoint 相比所做的更改。
定时器(Timer)默人存储在 RocksDB 中,当 Job 只有很少的 Timer 时,放在堆上存储可以提高性能。
请小心使用此功能,因为基于堆的 Timer 可能会增加 checkpoint 时间,并且无法在内存之外扩展。
RocksDB State Backend 的性能在很大程度上取决于其可用的内存量。为了提高性能,增加内存会有很大帮助,或者调整内存使用。
默认情况,RocksDB State Backend 使用 Flink 托管内存用于 RocksDBs buffer 和 cache( state.backend.rocksdb.memory.managed: true )。 要调整与内存相关的性能问题,以下步骤可能会有所帮助:
本节讨论如何决定一个 Flink 作业应该使用多少资源才能可靠地运行。容量规划的基本经验法则是:
Flink 为所有 checkpoint 和 savepoint 提供可选的压缩(默认值:off)。目前,压缩总是使用 snappy compression algorithm(version 1.1.4) 但计划在未来支持自定义压缩算法。压缩的粒度是 keyed state 的 key-group,每个 key-group 可以单独压缩,这对于缩放程序非常重要。
压缩可以通过 ExecutionConfig 开启
压缩选项对增量快照(RocksDB)没有影响。
在 Flink 的 checkpoint 中,每个 Task 都会生成一个 State snapshot,然后将其写入分布式存储。每个 Task 通过发送一个描述 State 在分布式存储中的位置的句柄来确认 State 成功写入 JobManager。JobManager 依次从所有 Task 收集句柄,并将绑定到到 checkpoint 对象中。
在恢复的情况下,JobManager 打开最新的 checkpoint 对象并将句柄发送回相应的 Task,然后这些 Task 可以从分布式存储中恢复 State。使用分布式存储来存储 State 有两个重要的优点。首先,存储是容错的,其次,分布式存储中的所有 State 对所有节点都是可访问的,并且可以很容易地重新分配(例如,用于重新缩放)。
然而,使用远程分布式存储也有一个很大的缺点:所有 Task 都必须通过网络从远程位置读取其状态。在一些情况下,恢复可以将 Task 重新安排到与上一次运行相同的 TaskManager 中,但仍然要读取远程状态。这可能会导致大状态的恢复时间长。
任务本地 State 恢复是针对这一类问题,主要思想如下:对于每个 checkpoint,每个 Task 不仅将 State snapshot 写入分布式存储,而且还将 state snapshot 的辅助副本保存在该 Task 所在的本地存储中(例如,本地磁盘或内存中)。State 的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供重新分发 State 的访问。
对于每个可以重新安排到上一个位置进行恢复的 Task,可以从本地辅助副本恢复 State,并避免远程读取的开销。考虑到许多故障不是节点故障,节点故障通常一次只影响一个或极少数节点,在恢复过程中,大多数 Task 很可能返回到其以前的位置,并发现其本地 State 完好无损,可以有效地缩短恢复时间。
需要注意的是,根据所选的 state backend 和 checkpoint 策略,在创建和存储本地辅助副本时,每个 checkpoint 可能需要一些额外的成本。在大多数情况下,实现只需将对分布式存储的写入复制到本地文件。
任务本地恢复在默认情况下是停用的,可以通过 Flink 的配置开启( state.backend.local-recovery 指定为 false 或 true,还可以在 Job 上设置 CheckpointingOptions.LOCAL_RECOVERY )。
任务本地恢复假设在失败情况下保持分配的 Task 调度,其原理如下:每个 Task 都会记住之前分配的 Slot,在恢复过程中会请求完全相同的 Slot 进行重启。如果 Slot 不可用,任务将从 Resource Manager 请求一个全新的 Slot。
如果一个 TaskManager 不再可用,则之前分配该 TaskManager 上的 Task 必须在其他的 TaskManager 上运行,但是不会让其他可以在原 Slot 上恢复的 Task 改变位置。在这种策略下,会让尽可能多的 Task 在原 Slot 上启动,并从本地恢复 State。
Flink中的Checkpoint和Spark中的Checkpoint区别
文章目录
一、checkpoint
流式应用程序必须 24/7 全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。
1.1、Spark Streaming 的 checkpoint
Spark Streaming 需要通过Checkpoint将必要的数据或者操作进行备份,以便它可以从故障中恢复。检查点有两种类型的数据。
1.1.1、元数据检查点
将定义流计算的信息保存到 HDFS 等容错存储中。这用于从运行流应用程序驱动程序的节点故障中恢复。元数据检查点主要用在Driver进程中恢复程序。
这类信息主要包括以下几方面:
- SparkConf的相关信息
- Dream的相关操作
- 队列中等待处理的Job
1.1.2、数据检查点
将生成的 RDD 保存到可靠的存储中。这在一些跨多个批次组合数据的有状态转换中是必要的。在有状态的转换操作中,Spark Streaming会定期自动设置检查点,以切断上游依赖。
在这样的转换中,生成的 RDD 依赖于前一批的 RDD,这导致依赖链的长度随着时间的推移而不断增加,数据重算所耗费的时间,与依赖连的长度成正比。为了避免恢复时间的无限制增加(与依赖链成正比),有状态转换的中间 RDD 会定期检查点到可靠存储(例如 HDFS)以切断依赖链。
总而言之,元数据检查点主要用于从驱动程序故障中恢复,而数据或 RDD 检查点对于使用状态转换的基本功能也是必需的。
1.2、Flink 的 checkpoint
spark streaming 的 checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 checkpoint。
flink 的 checkpoint 机制 要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
二、Exactly-Once Semantics
对于 SparkStreaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。
Flink 则使用 两阶段提交协议 来解决这个问题。
三、checkpoint的内容
spark 通过 rdd.checkpoint() 将rdd数据缓存指定目录,通过缓存的rdd数据来进行容错
flink 是 生成一个轻量级分布快照
以上是关于Flink Checkpoint 和 Large State 调优的主要内容,如果未能解决你的问题,请参考以下文章
Flink中的Checkpoint和Spark中的Checkpoint区别
Flink中的Checkpoint和Spark中的Checkpoint区别