Flink--状态后端

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink--状态后端相关的知识,希望对你有一定的参考价值。

一、是什么

再讲状态后端是什么之前,我要说一句:Flink中状态(State)是什么?

看一下官网怎么说

状态后端: 状态都需要存储到状态后端(StateBackend),然后在checkpoint触发时,将状态持久化到外部存储系统。Flink提供了三种类型的状态后端,分别是基于内存的状态后端(MemoryStateBackend)、基于文件系统的状态后端(FsStateBackend)以及基于RockDB作为存储介质的RocksDB StateBackend

简而言之,状态想要持久化就需要通过状态后端来实现

三、如何用

  • MemoryStateBackend

MemoryStateBackend将状态数据全部存储在JVM堆内存中,包括用户在使用DataStream API中创建的Key/Value State,窗口中缓存的状态数据,以及触发器等数据。MemoryStateBackend具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。Flink将MemoryStateBackend作为默认状态后端。

MemoryStateBackend比较适合用于测试环境中,并用于本地调试和验证,不建议在生产环境中使用。但如果应用状态数据量不是很大,例如使用了大量的非状态计算算子,也可以在生产环境中使MemoryStateBackend.

  • FsStateBackend

FsStateBackend是基于文件系统的一种状态后端,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统。创建FsStateBackend的构造函数如下:

FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)

其中path如果为本地路径,其格式为“file:///data/flink/checkpoints”,如果path为HDFS路径,其格式为“hdfs://nameservice/flink/checkpoints”。FsStateBackend中第二个Boolean类型的参数指定是否以同步的方式进行状态数据记录,默认采用异步的方式将状态数据同步到文件系统中,异步方式能够尽可能避免在Checkpoint的过程中影响流式计算任务。如果用户想采用同步的方式进行状态数据的检查点数据,则将第二个参数指定为True即可。

相比于MemoryStateBackend, FsStateBackend更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或Key/value State状态数据量非常大的场景,这时系统内存不足以支撑状态数据的存储。同时FsStateBackend最大的好处是相对比较稳定,在checkpoint时,将状态持久化到像HDFS分布式文件系统中,能最大程度保证状态数据的安全性。

  • RocksDBStateBackend

与前面的状态后端不同,RocksDBStateBackend需要单独引入相关的依赖包。RocksDB 是一个 key/value 的内存存储系统,类似于HBase,是一种内存磁盘混合的 LSM DB。当写数据时会先写进write buffer(类似于HBase的memstore),然后在flush到磁盘文件,当读取数据时会现在block cache(类似于HBase的block cache),所以速度会很快。

RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。

需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量即可。对于超大状态的作业,例如天级窗口聚合等场景下可以使会用该状态后端。

配置状态后端
Flink默认使用的状态后端是MemoryStateBackend,所以不需要显示配置。对于其他的状态后端,都需要进行显性配置。在Flink中包含了两种级别的StateBackend配置:一种是在程序中进行配置,该配置只对当前应用有效;另外一种是通过 flink-conf.yaml进行全局配置,一旦配置就会对整个Flink集群上的所有应用有效。

1、应用级别配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果使用RocksDBStateBackend则需要单独引入rockdb依赖库,如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

使用方式与FsStateBackend类似,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

2、集群级别配置

具体的配置项在flink-conf.yaml文件中,如下代码所示,参数state.backend指明StateBackend类型,state.checkpoints.dir配置具体的状态存储路径,代码中使用filesystem作为StateBackend,然后指定相应的HDFS文件路径作为state的checkpoint文件夹。

使用filesystem存储
state.backend: filesystem

checkpoint存储路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
如果想用RocksDBStateBackend配置集群级别的状态后端,可以使用下面的配置:

操作RocksDBStateBackend的线程数量,默认值为1
state.backend.rocksdb.checkpoint.transfer.thread.num: 1# 指定RocksDB存储状态数据的本地文件路径
state.backend.rocksdb.localdir: /var/rockdb/checkpoints

用于指定定时器服务的工厂类实现类,默认为“HEAP”,也可以指定为“RocksDB”
state.backend.rocksdb.timer-service.factory: HEAP

以上是关于Flink--状态后端的主要内容,如果未能解决你的问题,请参考以下文章

15-flink-1.10.1-flink 状态后端

15-flink-1.10.1-flink 状态后端

15-flink-1.10.1-flink 状态后端

Flink--状态后端

Flink--状态后端

Flink--状态后端