状态及Checkpoint调优

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了状态及Checkpoint调优相关的知识,希望对你有一定的参考价值。

RocksDB大状态调优

RocksDB是基于LSM Tree实现的,写数据都是先写入到内存中,所有RocksDB的写请求效率较高。RocksDB使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中的blockcache中查找,如果没有再去磁盘中查询。

使用RocksDB时,状态大小仅受可用磁盘空间量的限制,性能瓶颈主要在于RocksDB对磁盘的读请求,每次读写操作都必须对数据进行序列化或者反序列化。

当处理性能不够的时候,需要横向扩展并行度提高整个Job的吞吐量。

开启State访问性能监控

Flink 1.13中引入了State访问性能监控 latency trackig state。此功能不局限于State Backend类型,自定义实现的State Backend也可以复用此功能。

State访问性能监控会产生一定的性能影响,所以,默认每100此会做一次取样sample,对不同的State Backend的性能损失影响不同。(RocksDB 大概在1%,Heap 10%)

开启代码:

        state.backend.latency-track.keyed-state-enabled: true

        state.backend.latency-track.sample-interval: 100

        state.backend.latency-track.history-size:128

        state.backend.latency-track.state-name-as-variable:true # 将状态名作为变量 metics的

需要注意的是 单位是ns,所以计算为秒要除以1000,000。如果还达到了10ms,一般认为是比较高了。

开启增量检查点和本地恢复

开启增量检查点

RocksDB是目前唯一可以用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:

        state.backend.incremental:true

或者在代码中指定:

        new EmbeddedRocksDBStateBackend(true);

未开启的时候:

开启后:

开启本地恢复

当Flink任务失败后,可以基于本地的状态信息进行恢复任务,可能不需要从hdfs拉取数据。本地恢复目前仅涵盖键控类型的状态后端(RocksDB)。

        state.backend.local-recoverb:true

RocksDB设置多磁盘

        state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb,……

调整预定义选项

Flink 针对不同的设置为RocksDB提供了一些预定义的选项集合,其中包含了后续提到的一些参数,如果调整预定义选项后还达不到语气,再去调整后面的Block、Writebuffer等参数。

当前支持的预定义选项后DEFAULT/SPINNING_DISK_OPTIMIZED/SPINNING_DISK_OPTIMIZED_HIGH_MEM/FLASH_SSD_OPTIMIZED。使用SSH时,可以指定为FLASH_SSD_OPTIMIZED。

        state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGE_MEM

代码设置:

        new EmbeddedRocksDBStateBackend();

        .setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

增大Block缓存

整个RocksDB共享一个block cache,读数据时内存cache大小,该参数越大读数据时的缓存命中率越高,默认大小为8MB,建议设置到64~256MB。

        state.backend.rocksdb.block.cache-size:64mb

注意的是,整个下图的Memory是Managered Memory(整体的0.4)。

增大Write buffer 和level阈值大小

RocksDB中,每个state使用一个Column Family,每个Column Family使用独占的write buffer,默认64MB,建议调大。

        state.backend.rocksdb.writebuffer.size:128mb

调整这个参数通常要适当增加L1层大小阈值max-size-level-base 256mb这个值太小会造成能存放的SST文件过少,层级变多造成查找困难,太大会造成文件过多,合并困难。建议设置为target_file_size_base 64mb的倍数,且不能太小,230mb~640mb。

        state.backend.rocksdb.compaction.level.max-size-level-base:320mb

增大write buffer数量

每个Column Family 对应的writebuffer数量,这实际上时内存中“只读内存表”的最大数量,默认值为2。对于机器磁盘来说,如果内存够大,可以调大到5左右。

        state.backend.rocksdb.writebuffer.count: 5

增大后台线程和Write buffer 合并数

增大线程数

用于后台flush 和合并 sst文件线程数,默认为1.机械磁盘可以更改为4或者更大的值。

        state.backend.rocksdb.thread.num: 4

        1.13已经是4

增大 writebuffer最小合并数

将数据从writebuffer 中 flush到磁盘,需要合并的writebuffer最小数量,默认为1,可以调成3。

        state.backend.rocksdb.writebuffer.number-to-merge: 3

分区索引功能

Flink 1.13版本中对RocksDB增加了分区索引功能,复用了RocksDB的partitioned Index & filter 功能,简单的来说就是对RocksDB的partitioned Index做了多级索引。也就是将内存中的最上层常驻,下层根据需要再load回来,这样就大大降低了数据Swap竞争。

线上测试中,相对于内存比较小的场景中,性能可以提升10倍左右。如果在内存管控下RocksDB性能不如预期,这也能成为一个性能优化点。(适合小内存)

        state.backend.rocksdb.memory.partitioned-index-filters: true

        默认关闭

Checkpoint 设置

一般来说,我们的Checkpoint时间间隔可以设置为分钟级别(1~5min)。对于很大的任务每Checkpoint 访问HDFS比较耗时,可以设置为5~10分钟一次,并且调大两次Checkpoint之间的暂停间隔(4~8min)。

也需要考虑时效性的要求,需要在时效性和性能之间做一个平衡,如果时效性要求高,结合end-to-end时长,设置妙级或毫秒级。

如果Checkpoint语义配置为EXACTLY_ONCE,那么在Checkpoint过程中还需要barrier对齐过程,可以通过Flink Web UI的checkpoint选项卡来查看Checkpoint过程中各个阶段的耗时情况,从而确定倒地是哪个阶段导致Checkpoint时间过长然后针对性解决问题。

间隔、最小等待间隔、最大并发、保留ck。

查看end-to-end时间花费。

以上是关于状态及Checkpoint调优的主要内容,如果未能解决你的问题,请参考以下文章

Flink Checkpoint 和 Large State 调优

关于 Flink checkpoint,都在这里(二)

Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction

大数据面试题系列一

Linux系统 MySQL 运行状态及调优

06: mysql索引查找原理及调优