Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql

Posted

技术标签:

【中文标题】Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql【英文标题】:Flink SQL running out of memory doing Select - Insert from RDS to Mysql 【发布时间】:2022-01-02 18:52:05 【问题描述】:

在我的管道中,我使用 pyflink 从 RDS 加载和转换数据并接收到 mysql。使用 FLINK CDC,我可以从 RDS 中获取我想要的数据,并使用 JDBC 库接收到 MYSQL。我的目标是在 1 个工作中使用下面的代码示例读取 1 个表并创建 10 个其他表(基本上是在较小的表中打破一个巨大的表)。我面临的问题是,尽管使用 RocksDB 作为状态后端和 flink cdc 中的选项,例如 scan.incremental.snapshot.chunk.sizescan.snapshot.fetch.sizedebezium.min.row. count.to.stream.result,但使用内存不断增长,导致具有 2GB 内存的任务管理器出现故障。我的直觉是,一个简单的 select-insert 查询无论如何都会将所有表加载到内存中!如果是这样,我能以某种方式避免这种情况吗?表大小约为 500k 行。

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(env)
stmt_set = t_env.create_statement_set()


create_kafka_source= (
        """
            CREATE TABLE somethin(
               bla INT,
               bla1 DOUBLE,
               bla2 TIMESTAMP(3),
              PRIMARY KEY(bla2) NOT ENFORCED
         ) WITH (
        'connector'='mysql-cdc',
        'server-id'='1000',
        'debezium.snapshot.mode' = 'when_needed',   
        'debezium.poll.interval.ms'='5000',         
        'hostname'= 'som2',
        'port' ='som2',
        'database-name'='som3',
        'username'='som4',
        'password'='somepass',
        'table-name' = 'atable'
        )
        """
    )
create_kafka_dest = (
        """CREATE TABLE IF NOT EXISTS atable(
                    time1 TIMESTAMP(3),
                    blah2 DOUBLE,
                    PRIMARY KEY(time_stamp)  NOT ENFORCED

                    ) WITH (                       'connector'= 'jdbc',
                    'url' = 'jdbc:mysql://name1:3306/name1',
                    'table-name' = 't1','username' = 'user123',
                    'password' = '123'   
        )"""
    )



t_env.execute_sql(create_kafka_source)
t_env.execute_sql(create_kafka_dest)

stmt_set.add_insert_sql(
    "INSERT INTO atable SELECT  DISTINCT bla2,bla1,"
    "FROM somethin"
)

【问题讨论】:

【参考方案1】:

在流式查询中使用DISTINCT 的成本很高,尤其是在对独特性没有任何时间限制的情况下(例如,计算每天的唯一身份访问者)。我想这就是您的查询需要大量状态的原因。

但是,您应该能够使其正常工作。 RocksDB 并不总是表现良好。有时它会消耗比分配更多的内存。

你使用的是什么版本的 Flink?在 Flink 1.11 中进行了改进(通过切换到 jemalloc),在 Flink 1.14 中进行了进一步的改进(通过升级到更新版本的 RocksDB)。所以升级 Flink 可能会解决这个问题。否则你可能需要撒谎并告诉 Flink 它的内存比实际内存要少,这样当 RocksDB 越界时它不会导致内存不足的错误。

【讨论】:

David 感谢您的回复,确切地说,我在 dockerized 环境中使用 FLINK 1.13。删除不同的(并依赖于主键)完全相同..似乎将整个表加载到内存中。据我了解 select-insert 根本不使用任何状态!我可以以某种方式告诉 Flink 在 MYSQL 中接收后删除所有内容吗? 我想我误诊了这个问题。另一个假设:问题可能在于 Flink SQL 正在具体化传入 CDC/更新流的最终结果。这可以解释为什么它将整个表存储在 Flink 状态。能否分享一下使用 EXPLAIN 的结果,让我们看看执行计划是什么样的? 亲爱的大卫,你可以在这里找到执行计划:gist.github.com/Giwrgosyea/8dcfb28c2b797865eb137dc2a00a46c7

以上是关于Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Flink 中使用本地执行模式执行批处理 sql?

sql语句 select a+b from c b有时是NULL 如何修改语句,在b不等于null时,执行select a+b from c

Flink 1.9 Table & SQL 第一个程序 WordCount

Flink 1.9 Table & SQL 第一个程序 WordCount

游标是不是将 SELECT 结果记录集存储在内存中?

从0到1Flink的成长之路-Flink SQL 常用算子