FlinkMetrics运作机制
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkMetrics运作机制相关的知识,希望对你有一定的参考价值。
1.概述
转载并且补充:Flink源码剖析:Metrics运作机制
1. Metrics简介
1.1 什么是 Metrics?
Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的 Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况。
1.2 Metric Types
Metrics 的类型如下:
Counter
,对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。Gauge
,Gauge 是最简单的 Metrics,它反映一个值的大小。Meter
,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。理解为滚动窗口求平均值即可。Histogram
,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。
1.3 Metric Group
Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。
Metric Group 的层级有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每个 Job 具体到某一个 task 的 group,task 又分为 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 统计和一些 Metrics,整个层级大概如下图所示。Metrics 不会影响系统,它处在不同的组中,并且 Flink支持自己去加 Group,可以有自己的层级。
•TaskManagerMetricGroup
•TaskManagerJobMetricGroup
•TaskMetricGroup
•TaskIOMetricGroup
•OperatorMetricGroup
•$User-defined Group / $User-defined Metrics
•OperatorIOMetricGroup
•JobManagerMetricGroup
•JobManagerJobMetricGroup
JobManagerMetricGroup 相对简单,相当于 Master,它的层级也相对较少。
2. Metrics运行机制
系统或自定义的Metrics如何存入第三方存储呢?一般由两种模式,推(push)和拉(poll)。接下来,我们通过分析PrometheusReporter
和PrometheusPushGatewayReporter
两种Reporter,来深入理解Metrics的运行机制和push和pull的区别。
2.1 初始化Reporter
做一些与第三方存储相关的初始化工作
2.1.1 PrometheusReporter
启动本地Rest服务器,Prometheus与其通信,并pull metrics
public class PrometheusReporter extends AbstractPrometheusReporter
// 初始化一些与第三方存储相关的工作
@Override
public void open(MetricConfig config)
super.open(config);
String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
while (ports.hasNext())
int port = ports.next();
try
// internally accesses CollectorRegistry.defaultRegistry
// 针对PrometheusReporter,启动一个本地Rest服务器,以供Prometheus拉取数据;
// 而如果是PrometheusPushGatewayReporter,这里就是根据host+port新建一个PushGateway客户端,来往PushGateway发送数据
httpServer = new HTTPServer(port);
this.port = port;
log.info("Started PrometheusReporter HTTP server on port .", port);
break;
catch (IOException ioe) //assume port conflict
log.debug("Could not start PrometheusReporter HTTP server on port .", port, ioe);
if (httpServer == null)
throw new RuntimeException("Could not start PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig);
2.1.2 PrometheusPushGatewayReporter
新建一个PushGateway客户端,主动向PushGateway push metrics
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled
@Override
public void open(MetricConfig config)
super.open(config);
String host = config.getString(HOST.key(), HOST.defaultValue());
int port = config.getInteger(PORT.key(), PORT.defaultValue());
String configuredJobName = config.getString(JOB_NAME.key(), JOB_NAME.defaultValue());
boolean randomSuffix = config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), RANDOM_JOB_NAME_SUFFIX.defaultValue());
deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), DELETE_ON_SHUTDOWN.defaultValue());
if (host == null || host.isEmpty() || port < 1)
throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
if (randomSuffix)
this.jobName = configuredJobName + new AbstractID();
else
this.jobName = configuredJobName;
// 新建一个PushGateway客户端,用于向PushGateway推送Metrics
pushGateway = new PushGateway(host + ':' + port);
log.info("Configured PrometheusPushGatewayReporter with host:, port:, jobName: , randomJobNameSuffix:, deleteOnShutdown:", host, port, jobName, randomSuffix, deleteOnShutdown);
2.2 注册Reporter
向MetricRegistry对象注册已配置的所有Reporter,以列表的形式维护
public class MetricRegistryImpl implements MetricRegistry
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations)
this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
this.terminationFuture = new CompletableFuture<>();
this.isShutdown = false;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>(4);
// 周期调度器,用与汇报型的reporter周期向外界数据源周期汇报metrics
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
this.queryService = null;
this.metricQueryServiceRpcService = null;
if (reporterConfigurations.isEmpty())
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
else
// 根据flink-conf.yaml中配置的reporter类型,获得所有reporter
for (ReporterSetup reporterSetup : reporterConfigurations)
final String namedReporter = reporterSetup.getName();
try
Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod.isPresent())
try
String[] interval = configuredPeriod.get().split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
catch (Exception e)
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
// reporter实例
final MetricReporter reporterInstance = reporterSetup.getReporter();
final String className = reporterInstance.getClass().getName();
// 汇报型reporter,例如PrometheusPushGatewayReporter
if (reporterInstance instanceof Scheduled)
LOG.info("Periodically reporting metrics in intervals of for reporter of type .", period, timeunit.name(), namedReporter, className);
// 周期向外界数据源汇报metrics
executor.scheduleWithFixedDelay(
new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
else
LOG.info("Reporting metrics for reporter of type .", namedReporter, className);
// 注册当前reporter到MetricRegistry中,这里MetricRegistry可以理解为注册中心
reporters.add(reporterInstance);
String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter));
if (delimiterForReporter.length() != 1)
LOG.warn("Failed to parse delimiter '' for reporter '', using global delimiter ''.", delimiterForReporter, namedReporter, globalDelimiter);
delimiterForReporter = String.valueOf(globalDelimiter);
this.delimiters.add(delimiterForReporter.charAt(0));
catch (Throwable t)
LOG.error("Could not instantiate metrics reporter . Metrics might not be exposed/reported.", namedReporter, t);
2.3 收集Metrics到内存
当启动flink任务时,向MetricRegistry注册metrics
// MetricRegistryImpl类
public class MetricRegistryImpl implements MetricRegistry
// Registers a new @link Metric with this registry.
@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group)
for (int i = 0; i < reporters.size(); i++)
MetricReporter reporter = reporters.get(i);
try
if (reporter != null)
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
// 注册metrics,做了以下操作:
// 1. 将metric注册到CollectorRegistry.defaultRegistry这个静态类
// 2. 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
reporter.notifyOfAddedMetric(metric, metricName, front);
catch (Exception e)
LOG.warn("Error while registering metric.", e);
public abstract class AbstractPrometheusReporter implements MetricReporter
private void addMetric(Metric metric, List<String> dimensionValues, Collector collector)
if (metric instanceof Gauge)
((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
else if (metric instanceof Counter)
((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
else if (metric instanceof Meter)
((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
else if (metric instanceof Histogram)
((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
else
log.warn("Cannot add unknown metric type: . This indicates that the metric type is not supported by this reporter.",
metric.getClass().getName());
public abstract class SimpleCollector<Child> extends Collector
public <T extends Collector> T setChild(Child child, String... labelValues)
if (labelValues.length != this.labelNames.size())
throw new IllegalArgumentException("Incorrect number of labels.");
else
// 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
this.children.put(Arrays.asList(labelValues), child);
return this;
2.4 发送Metrics到第三方存储
-
PrometheusReporter
:
Metrics被存储到Map构成的容器中,就完成了使命,等待Prometheus定时来pull Metrics即可,这是pull的模式
; -
PrometheusPushgatewayReporter
:Flink中的reporter分为两种,一种是实现了Scheduled接口的,一种是没有实现Scheduled接口的;如果实现了Scheduled则表明,该reporter会周期性(通过周期调度线程池实现)地向第三方存储发送Metrics,即推的模式。
请参考以下源码中的中文注释:
public class MetricRegistryImpl implements MetricRegistry
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations)
this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
this.terminationFuture = new CompletableFuture<>();
this.isShutdown = false;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>(4);
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
this.queryService = null;
this.metricQueryServiceRpcService = null;
if (reporterConfigurations.isEmpty())
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
else
for (ReporterSetup reporterSetup : reporterConfigurations)
final String namedReporter = reporterSetup.getName();
try
Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod.isPresent())
try
String[] interval = configuredPeriod.get().split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
catch (Exception e)
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
final MetricReporter reporterInstance = reporterSetup.getReporter();
final String className = reporterInstance.getClass().getName();
// 如果实现了Scheduled,则表明reporter会周期性向第三方存储汇报Metrics
if (reporterInstance instanceof Scheduled)
LOG.info("Periodically reporting metrics in intervals of for reporter of type .", period, timeunit.name(), namedReporter, className);
// 启动周期汇报Metrics的定时线程
executor.scheduleWithFixedDelay(
new MetricRegistryImpl日志服务器的运作机制: