在 flink 1.13 中配置 RocksDB
Posted
技术标签:
【中文标题】在 flink 1.13 中配置 RocksDB【英文标题】:Configure RocksDB in flink 1.13 【发布时间】:2021-06-04 02:11:31 【问题描述】:我在 Flink 1.13 版本中读过 EmbeddedRocksDBStateBackend
但有大小限制,所以我想保留我之前 Flink 版本 1.11 的当前配置,但关键是这种配置 RocksDB 的方式已被弃用(@987654322 @)。
我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
进行新配置,但出现此错误:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?
【问题讨论】:
【参考方案1】:在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了对事情如何工作的许多误解。所以这两个问题被解耦了:
-
工作状态的存储位置(状态后端)。 (对于 RocksDB,它应该配置为使用最快的可用本地磁盘。)
检查点的存储位置(检查点存储)。在大多数情况下,这应该是一个分布式文件系统。
使用旧 API,RocksDB 涉及两个不同的文件系统这一事实被传递给 RocksDBStateBackend
构造函数的检查点路径的方式所掩盖。所以那部分配置已经移到别处(见下文)。
此表显示了旧状态后端和新状态后端之间的关系(结合检查点存储):
Legacy State Backend | New State Backend + Checkpoint Storage |
---|---|
MemoryStateBackend |
HashMapStateBackend + JobManagerCheckpointStorage |
FsStateBackend |
HashMapStateBackend + FileSystemCheckpointStorage |
RocksDBStateBackend |
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage |
在您的情况下,您希望将EmbeddedRocksDBStateBackend
与FileSystemCheckpointStorage
一起使用。您当前遇到的问题是您在 RocksDB 中使用内存检查点存储 (JobManagerCheckpointStorage
),这严重限制了可以检查点的状态量。
您可以通过在flink-conf.yaml
中指定检查点目录来解决此问题
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
或在您的代码中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
有关完整详细信息,请参阅Migrating from Legacy Backends 上的文档。
【讨论】:
以上是关于在 flink 1.13 中配置 RocksDB的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.13 新版状态后端 StateBackend 详解