Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql
Posted
技术标签:
【中文标题】Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql【英文标题】:Flink SQL running out of memory doing Select - Insert from RDS to Mysql 【发布时间】:2022-01-02 18:52:05 【问题描述】:在我的管道中,我使用 pyflink 从 RDS 加载和转换数据并接收到 mysql。使用 FLINK CDC,我可以从 RDS 中获取我想要的数据,并使用 JDBC 库接收到 MYSQL。我的目标是在 1 个工作中使用下面的代码示例读取 1 个表并创建 10 个其他表(基本上是在较小的表中打破一个巨大的表)。我面临的问题是,尽管使用 RocksDB 作为状态后端和 flink cdc 中的选项,例如 scan.incremental.snapshot.chunk.size
和 scan.snapshot.fetch.size
和 debezium.min.row. count.to.stream.result
,但使用内存不断增长,导致具有 2GB 内存的任务管理器出现故障。我的直觉是,一个简单的 select-insert 查询无论如何都会将所有表加载到内存中!如果是这样,我能以某种方式避免这种情况吗?表大小约为 500k 行。
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
stmt_set = t_env.create_statement_set()
create_kafka_source= (
"""
CREATE TABLE somethin(
bla INT,
bla1 DOUBLE,
bla2 TIMESTAMP(3),
PRIMARY KEY(bla2) NOT ENFORCED
) WITH (
'connector'='mysql-cdc',
'server-id'='1000',
'debezium.snapshot.mode' = 'when_needed',
'debezium.poll.interval.ms'='5000',
'hostname'= 'som2',
'port' ='som2',
'database-name'='som3',
'username'='som4',
'password'='somepass',
'table-name' = 'atable'
)
"""
)
create_kafka_dest = (
"""CREATE TABLE IF NOT EXISTS atable(
time1 TIMESTAMP(3),
blah2 DOUBLE,
PRIMARY KEY(time_stamp) NOT ENFORCED
) WITH ( 'connector'= 'jdbc',
'url' = 'jdbc:mysql://name1:3306/name1',
'table-name' = 't1','username' = 'user123',
'password' = '123'
)"""
)
t_env.execute_sql(create_kafka_source)
t_env.execute_sql(create_kafka_dest)
stmt_set.add_insert_sql(
"INSERT INTO atable SELECT DISTINCT bla2,bla1,"
"FROM somethin"
)
【问题讨论】:
【参考方案1】:在流式查询中使用DISTINCT
的成本很高,尤其是在对独特性没有任何时间限制的情况下(例如,计算每天的唯一身份访问者)。我想这就是您的查询需要大量状态的原因。
但是,您应该能够使其正常工作。 RocksDB 并不总是表现良好。有时它会消耗比分配更多的内存。
你使用的是什么版本的 Flink?在 Flink 1.11 中进行了改进(通过切换到 jemalloc),在 Flink 1.14 中进行了进一步的改进(通过升级到更新版本的 RocksDB)。所以升级 Flink 可能会解决这个问题。否则你可能需要撒谎并告诉 Flink 它的内存比实际内存要少,这样当 RocksDB 越界时它不会导致内存不足的错误。
【讨论】:
David 感谢您的回复,确切地说,我在 dockerized 环境中使用 FLINK 1.13。删除不同的(并依赖于主键)完全相同..似乎将整个表加载到内存中。据我了解 select-insert 根本不使用任何状态!我可以以某种方式告诉 Flink 在 MYSQL 中接收后删除所有内容吗? 我想我误诊了这个问题。另一个假设:问题可能在于 Flink SQL 正在具体化传入 CDC/更新流的最终结果。这可以解释为什么它将整个表存储在 Flink 状态。能否分享一下使用 EXPLAIN 的结果,让我们看看执行计划是什么样的? 亲爱的大卫,你可以在这里找到执行计划:gist.github.com/Giwrgosyea/8dcfb28c2b797865eb137dc2a00a46c7以上是关于Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql的主要内容,如果未能解决你的问题,请参考以下文章
sql语句 select a+b from c b有时是NULL 如何修改语句,在b不等于null时,执行select a+b from c
Flink 1.9 Table & SQL 第一个程序 WordCount