Flink中如何实现一个自定义MetricReporter
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中如何实现一个自定义MetricReporter相关的知识,希望对你有一定的参考价值。
参考技术A 在 flink 任务运行的过程中,用户通常想知道任务运行的一些基本指标,比如吞吐量、内存和 cpu 使用情况、checkpoint 稳定性等等。而通过 flink metrics 这些指标都可以轻而易举地获取到,避免任务的运行处于黑盒状态,通过分析这些指标,可以更好的调整任务的资源、定位遇到的问题、对任务进行监控。接下来本文将介绍 flink metrics 的一些基本概念与原理以及实践。Flink 对于指标监测有一套自己的实现,同时 flink 自身系统有一些固定的 metric 数据, 包括系统的一些指标,CPU,内存, IO 或者各个 task 运行的一些指标。指标的统计方式有四种,这些指标都实现了 Metric 这个接口,而 Metric 这个接口只是一个标识,本身并没有定义如何方法接口,部分子类的继承关系如下所示。
从图中可以看出,Metric 这个接口有四个直接子类,分别是:
下面以 Counter 为例,说明 Metric 的具体用法,Counters 通常用来计数,可以通过 inc 或 dec 方法来对计数值进行增加或减少。
获取 Metrics 有三种方法,首先可以在 WebUI 上看到;其次可以通过 RESTful API 获取,RESTful API 对程序比较友好,比如写自动化脚本或程序,自动化运维和测试,通过 RESTful API 解析返回的 Json 格式对程序比较友好;最后,还可以通过 Metric Reporter 获取,监控主要使用 Metric Reporter 功能。
flink 提供了很多外部监控系统的支持:JMX(java 自带的技术,不严格属于第三方)、Graphite、InfluxDB、Prometheus、StatsD、Datadog、Slf4j(直接打 log 里)等,也可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。如果想要定期发送报告,可以实现 Scheduled 接口。
Metric Reporter 是如何配置的?首先 Metrics Reporters 的名字用逗号分隔,然后通过 metrics.reporter.jmx.class 的 classname 反射找 reporter,还需要拿到 metrics.reporter.jmx.port 的配置,比如向第三方系统通过网络发送的比较多,但要知道往哪里发,ip 地址、port 信息是比较常见的。
开发者可以实现自己的 reporter,将 metrics 数据导出到不同的系统。
MetricReporter 是用来向外暴露 Metric 的监测结果的接口。由于 MetricReporter 的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共、无参的构造函数,这个接口的定义如下:
关注 gzh “HEY DATA” 后台回复关键字 MetricReporter 可获得自定义 MetricReporter 实现例子文件。
Flink SQL 自定义 format
参考技术A 由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。1.自定义 Factory 实现 DeserializationFormatFactory
2.自定义 DeserializationSchema 实现 DeserializationSchema
为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息
自定义 Factory
自定义 DeserializationSchema
<h2 id="4">4.使用自定义 Format </h2>
'format' = 'null' Factory 的唯一标识
然后就可以直接执行了
以上是关于Flink中如何实现一个自定义MetricReporter的主要内容,如果未能解决你的问题,请参考以下文章