在 flink 1.13 中配置 RocksDB

Posted

技术标签:

【中文标题】在 flink 1.13 中配置 RocksDB【英文标题】:Configure RocksDB in flink 1.13 【发布时间】:2021-06-04 02:11:31 【问题描述】:

我在 Flink 1.13 版本中读过 EmbeddedRocksDBStateBackend 但有大小限制,所以我想保留我之前 Flink 版本 1.11 的当前配置,但关键是这种配置 RocksDB 的方式已被弃用(@987654322 @)。

我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true)) 进行新配置,但出现此错误:

java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?

【问题讨论】:

【参考方案1】:

在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了对事情如何工作的许多误解。所以这两个问题被解耦了:

    工作状态的存储位置(状态后端)。 (对于 RocksDB,它应该配置为使用最快的可用本地磁盘。) 检查点的存储位置(检查点存储)。在大多数情况下,这应该是一个分布式文件系统。

使用旧 API,RocksDB 涉及两个不同的文件系统这一事实被传递给 RocksDBStateBackend 构造函数的检查点路径的方式所掩盖。所以那部分配置已经移到别处(见下文)。

此表显示了旧状态后端和新状态后端之间的关系(结合检查点存储):

Legacy State Backend New State Backend + Checkpoint Storage
MemoryStateBackend HashMapStateBackend + JobManagerCheckpointStorage
FsStateBackend HashMapStateBackend + FileSystemCheckpointStorage
RocksDBStateBackend EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage

在您的情况下,您希望将EmbeddedRocksDBStateBackendFileSystemCheckpointStorage 一起使用。您当前遇到的问题是您在 RocksDB 中使用内存检查点存储 (JobManagerCheckpointStorage),这严重限制了可以检查点的状态量。

您可以通过在flink-conf.yaml 中指定检查点目录来解决此问题

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

或在您的代码中

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

有关完整详细信息,请参阅Migrating from Legacy Backends 上的文档。

【讨论】:

以上是关于在 flink 1.13 中配置 RocksDB的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.13 新版状态后端 StateBackend 详解

大数据开发-Flink-1.13新特性

深入解读 Flink SQL 1.13

深入解读 Flink SQL 1.13

Flink 1.13,面向流批一体的运行时与 DataStream API 优化

Flink 1.13,面向流批一体的运行时与 DataStream API 优化