流计算 Oceanus | Flink JVM 内存超限的分析方法总结

Posted 腾讯云大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流计算 Oceanus | Flink JVM 内存超限的分析方法总结相关的知识,希望对你有一定的参考价值。

等常见的需要使用 Native 内存且容易造成内存泄漏的第三方库,而且从 GC 日志来看,堆内各个区域远远没有用满,说明余量还是比较充足的。

类来描述的。首先 Flink 的 ResourceManager 会调用 工具类,从用户和系统的各项配置 Configuration 中获取各个内存区域的大小( 对象,不含 Metaspace 和 Overhead 部分)。这中间要考虑到旧版本参数的对象。

这个工具类从上述的 对象中生成的。


进程 (mmap: reserved= (classes (malloc= (mmap: reserved= (thread (stack: reserved= (malloc= (arena= (malloc= (mmap: reserved= (malloc= (mmap: reserved= (malloc= (arena= (malloc= (mmap: reserved= (malloc= (arena= (malloc= (tracking overhead= (malloc= (mmap: reserved=随后在 Flink 参数里加上这些内容:

  • 调用的(例如 Flink MemoryManager 分配的堆外 MemorySegments),中间分支的 init 是 JVM 启动期间分配的,也是正常范围。右边分支主要是 JVM 内部的 ParNew & CMS GC、Class 解析所需的符号表、代码缓存所需的内存,也是正常的。因此并未观察到较大的第三方库造成的内存泄漏情况,因此间接引入第三方库造成内存泄漏的可能性也基本排除了。

    pmap -x JVM进程的PID > sleep 设定值过小(为了给堆内存留出更大空间,在这里只设置了 256MB 的阈值,而实际的内存占用不止这些)。

    参数的值(例如给到 1~2GB),避免余量不够而造成的总内存用量超标的问题。

    下表总结了本文所用的工具和适用场景:

    工具名简介常用使用场景
    jstatJava 自带的命令,可以查看 JVM 的统计信息,例如各类 GC 次数、时长等,各内存区域的使用量等查看各区域内存用量,定位 GC 问题等
    jmapJava 自带的命令,可以生成 JVM 堆内内存的 Dump 文件,也可以查看内存对象分布直方图等获取堆内内存 Dump、查看堆内存中对象分布
    jcmdJava 自带的命令,可以对正在运行的 JVM 发送命令,例如开启和关闭特定参数、触发 GC、查看某些统计信息等开启内置 Flight Recorder、查看 NMT 统计信息等
    Arthas包含了一系列问题定位和 JVM 操控小工具,可用来拦截运行时调用现场,动态代码替换、查看 Classloader、Dump 内存等多种用途各类场景,通常在线使用
    JProfiler非常强大的 JVM 剖析和问题排查工具,可以在线 attach 到远程 JVM,也可以离线分析 Dump 文件各类场景,图形化诊断 JVM 问题
    MATEclipse 推出的自动堆内存泄露分析工具堆内存泄露分析
    NMTJava 自带的功能,可以追踪 JVM 内部各区域的内存分配和使用情况堆外内存分析
    jemalloc + jeprof一个通用的内存管理库,可以替代 glibc 中的 malloc,可以避免很多内存碎片问题,支持记录调用次数和分配量等信息等用于后续分析底层 malloc 调用分析和剖析
    pmapLinux 自带命令,查看某个进程的内存映射信息进程内存映射情况分析


    后续计划

    由于人工排查堆外内存问题的过程相当繁琐,十分依赖定位者的直觉和经验,可复制性弱,工具不统一,效率很低。

    我们正在规划将这些定位流程标准化地集成到我们的流计算 Oceanus 平台上,做到自助、自动诊断,逐步实现我们的愿景:打造大数据产品生态体系的实时化分析利器,成为一个基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定的企业级实时大数据分析平台,实现企业数据价值最大化,加速企业实时化、数字化的建设进程。


    参考阅读

    [1] Flink 官方文档 · 内存模型详解 https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_detail.html

    [2] Flink内存配置 https://www.jianshu.com/p/a29b7b7feaaf

    [3] Flink内存设置思路 https://www.cnblogs.com/lighten/p/13053828.html

    [4] jvm-profiling-tools/async-profiler https://github.com/jvm-profiling-tools/async-profiler

    [5] Memory Analyzer (MAT) https://www.eclipse.org/mat/

    [6] Native Memory Tracking https://docs.oracle.com/javase/8/docs/technotes/guides/vm/nmt-8.html

    [7] 疑案追踪:Spring Boot内存泄露排查记 https://mp.weixin.qq.com/s/aYwIH0TN3nSzNaMR2FN0AA

    [8] jemalloc https://github.com/jemalloc/jemalloc/releases

    流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓



    点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
    腾讯云大数据

    长按二维码
    关注我们



    Flink 实践教程:入门:写入 Elasticsearch

    作者:腾讯云流计算 Oceanus 团队

     

    流计算 Oceanus 简介

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

    本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。

     操作视频

     

    前置准备

    创建 流计算 Oceanus 集群

    进入流计算 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群

    创建 Elasticsearch 集群

    进入Elasticsearch 控制台,点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群

    !创建流计算 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

     

    流计算 Oceanus 作业

    1. 创建 Source

    -- Datagen Connector 可以随机生成一些数据用于测试
    -- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

    CREATE TABLE random_source (
    f_sequence INT,
    f_random INT,
    f_random_str VARCHAR
    ) WITH (
     \'connector\' = \'datagen\',
     \'rows-per-second\'=\'1\',  -- 每秒产生的数据条数
         
     \'fields.f_sequence.kind\'=\'sequence\',   -- 有界序列(结束后自动停止输出)
     \'fields.f_sequence.start\'=\'1\',         -- 序列的起始值
     \'fields.f_sequence.end\'=\'10000\',       -- 序列的终止值
         
     \'fields.f_random.kind\'=\'random\',       -- 无界的随机数
     \'fields.f_random.min\'=\'1\',             -- 随机数的最小值
     \'fields.f_random.max\'=\'1000\',          -- 随机数的最大值
         
     \'fields.f_random_str.length\'=\'10\'      -- 随机字符串的长度
    );

    2. 创建 Sink

    -- Elasticsearch 只能作为数据目的表(Sink)写入
    -- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
    -- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

    CREATE TABLE Student (
      `user_id`   INT,
      `user_name` VARCHAR
    ) WITH (
       \'connector.type\' = \'elasticsearch\', -- 输出到 Elasticsearch

       \'connector.version\' = \'6\',            -- 指定 Elasticsearch 的版本, 例如 \'6\', \'7\'. 注意务必要和所选的内置 Connector 版本一致
       \'connector.hosts\' = \'http://10.0.0.175:9200\',  -- Elasticsearch 的连接地址
       \'connector.index\' = \'Student\',        -- Elasticsearch 的 Index 名
       \'connector.document-type\' = \'stu\',    -- Elasticsearch 的 Document 类型
       \'connector.username\' = \'elastic\',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
       \'connector.password\' = \'xxxxxxxxxx\',  -- 可选参数: 请替换为实际 Elasticsearch 密码

       \'update-mode\' = \'append\',             -- 可选无主键的 \'append\' 模式,或有主键的 \'upsert\' 模式    
       \'connector.key-delimiter\' = \'$\',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
       \'connector.key-null-literal\' = \'n/a\',  -- 主键为 null 时的替代字符串,默认是 \'null\'
       \'connector.failure-handler\' = \'retry-rejected\',   -- 可选的错误处理。可选择 \'fail\' (抛出异常)、\'ignore\'(忽略任何错误)、\'retry-rejected\'(重试)

       \'connector.flush-on-checkpoint\' = \'true\',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
       \'connector.bulk-flush.max-actions\' = \'42\',  -- 可选参数, 每批次最多的条数
       \'connector.bulk-flush.max-size\' = \'42 mb\',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
       \'connector.bulk-flush.interval\' = \'60000\',  -- 可选参数, 批量写入的间隔 (ms)
       \'connector.connection-max-retry-timeout\' = \'300\',     -- 每次请求的最大超时时间 (ms)

       \'format.type\' = \'json\'        -- 输出数据格式, 目前只支持 \'json\'
    );

    3. 编写业务 SQL

    INSERT INTO Student
    SELECT
    f_sequence   AS user_id,
    f_random_str AS user_name
    FROM random_source;

    4. 选择 Connector

    点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

    ?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

    5. 数据查询

    进入Elasticsearch 控制台,点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群

     

    总结

    本示例用 Datagen 连接器随机生成数据,经过 流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

     

     

    关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

    以上是关于流计算 Oceanus | Flink JVM 内存超限的分析方法总结的主要内容,如果未能解决你的问题,请参考以下文章

    流计算 Oceanus | 巧用 Flink 构建高性能 ClickHouse 实时数仓

    Flink 实践教程:入门:写入 Elasticsearch

    实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控

    Flink 实践教程-入门:Jar 作业开发

    腾讯实时计算团队向Flink 1.7.0贡献了36个PR

    Oceanus的实时流式计算实践与优化