FlinkFlink 指标监测 相关源码
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 指标监测 相关源码相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink源码系列——指标监测 仅供自己学习,建议去看原文。
1、Metric简介
Flink对于指标监测有一套自己的实现,指标的统计方式有四种,这些指标都实现了Metric这个接口,而Metric这个接口只是一个标识,本身并没有定义如何方法接口,部分子类的继承关系如下所示。
从图中可以看出,Metric这个接口有四个直接子类,分别是:
Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
Counter —— 计数器,在一些情况下,会比Gauge高效,比如通过一个AtomicLong变量来统计一个队列的长度;
Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如TPS;
Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。
以MeterView为例,分析一个Metric的具体实现。MeterView还实现View接口,实现View接口的类,表示其会定期的执行update方法,进行数据的更新。
public class MeterView implements Meter, View
/** 底层使用的计算器 */
private final Counter counter;
/** 计算平均值的事件跨度 */
private final int timeSpanInSeconds;
/** 包含历史数据的循环数组 */
private final long[] values;
/** 当前时间在数组中的索引 */
private int time = 0;
/** 最新计算的rate */
private double currentRate = 0;
public MeterView(int timeSpanInSeconds)
this(new SimpleCounter(), timeSpanInSeconds);
public MeterView(Counter counter, int timeSpanInSeconds)
this.counter = counter;
/** 这里的操作是为了让时间跨度刚好是 UPDATE_INTERVAL_SECONDS 的整数倍 */
this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS);
this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1];
@Override
public void markEvent()
this.counter.inc();
@Override
public void markEvent(long n)
this.counter.inc(n);
@Override
public long getCount()
return counter.getCount();
@Override
public double getRate()
return currentRate;
@Override
public void update()
time = (time + 1) % values.length;
values[time] = counter.getCount();
currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
从类的属性变量中可以看出,MeterView是在一个Counter计数器的基础之上,封装了一层,从而实现事件每秒的平均速率。以values这个长整型的数组,作为环形数组,实现对最新的历史数据的保存。
在构造函数中,会对入参timeSpanInSeconds这个时间跨度进行修正,使其刚好是UPDATE_INTERVAL_SECONDS的整数倍,另外values数组的长度是timeSpanInSeconds对UPDATE_INTERVAL_SECONDS倍数,再加上1,这样这个数组的最新数据和最老的数据之间的时间间隔就刚好是timeSpanInSeconds。
假设values数组的长度为n,则:
1、索引n-1处的统计值,和索引0处的统计值,时间间隔就是timeSpanInSeconds;
2、由于是环形数组,所以索引0处的统计值,和索引1处的统计值的时间间隔就是timeSpanInSeconds;
3、所以索引i处的统计值,和索引(i+1)%n处的统计值,时间间隔是timeSpanInSeconds;
这个逻辑理清楚了,对update方法的逻辑也就清楚了。
另外,对于Metrics相关概念,可以参考 http://wuchong.me/blog/2015/08/01/getting-started-with-metrics/
2、MetricGroup
为了便于对Metric进行方便的管理和区分,可以对Metric进行分组,MetricGroup就是用来实现这个功能的。
MetricGroup的相关子类的继承关系如下所示。
1、ProxyMetricGroup —— 这是一个代理类,就是把新Metric或者新的子MetricGroup的注册,委托给代理MetricGroup进行处理;
2、AbstractMetricGroup —— 对新增Metric和子MetricGroup的相关方法进行了实现;
在AbstractMetricGroup中有这些属性
protected final A parent;
private final Map<String, Metric> metrics = new HashMap<>();
private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
parent —— 用来保存这个MetricGroup的父MetricGroup
metrics —— 这个map,是用来保存当前MetricGroup中注册的Metric;
groups —— 这个map,是用来保存当前MetricGroup中注册子MetricGroup;
通过这个数据结构可以看出,在MetricGroup中,可以建立一个树状的结构,用来存储和归类相关的Metric。
3、MetricReporter
MetricReporter是用来向外披露Metric的监测结果的接口。
由于MetricReporter的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共,无参的构造函数,这个接口的定义如下:
public interface MetricReporter
void open(MetricConfig config);
void close();
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
open —— 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作;
close —— 这里就是在关闭时,进行资源回收等相关操作的;
notifyOfAddedMetric —— 当有一个新的Metric注册时,会调用该方法来通知MetricReporter;
notifyOfRemovedMetric —— 当有一个Metric被移除时,通过这个方法来通知MetricReporter;
4、MetricRegistry
MetricGroup是用来对Metric进行分组管理,MetricReporter是用来对外披露Metric,而MetricRegistry就是这两者之间的桥梁,通过MetricRegistry,就可以让MetricReporter感知到在MetricGroup中的Metric发生的变化情况。
对于MetricRegistry这个接口,其实现为MetricRegistryImpl,而其在实例化时,构造函数的入参是一个MetricRegistryConfiguration实例。
4.1、MetricRegistryConfiguration
MetricRegistryConfiguration顾名思义,就是MetricRegistry的相关配置参数,主要有三个属性,如下:
/** flink中不同组件的范围格式 */
private final ScopeFormats scopeFormats;
/** 字符串的分隔符,这是一个全局的分隔符 */
private final char delimiter;
/** 配置中每个reporter的名称和其对应的配置对象的列表 */
private final List<Tuple2<String, Configuration>> reporterConfigurations;
这些属性,都是从配置参数中获取而来,逻辑如下:
public static MetricRegistryConfiguration fromConfiguration(Configuration configuration)
/** 获取scopeFormats */
ScopeFormats scopeFormats;
try
scopeFormats = ScopeFormats.fromConfig(configuration);
catch (Exception e)
LOG.warn("Failed to parse scope format, using default scope formats", e);
scopeFormats = ScopeFormats.fromConfig(new Configuration());
/** 获取分隔符 */
char delim;
try
delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
catch (Exception e)
LOG.warn("Failed to parse delimiter, using default delimiter.", e);
delim = '.';
/** 获取MetricReporter相关的配置信息,MetricReporter的配置格式是 metrics.reporters = foo, bar */
final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
List<Tuple2<String, Configuration>> reporterConfigurations;
if (definedReporters == null)
/** 如果没有配置,则返回空集合 */
reporterConfigurations = Collections.emptyList();
else
/** 按模式匹配分割,如上述的配置,则namedReporters="foo", "bar" */
String[] namedReporters = splitPattern.split(definedReporters);
reporterConfigurations = new ArrayList<>(namedReporters.length);
for (String namedReporter: namedReporters)
/**
* 这里是获取一个代理配置对象,就是在原来配置对象的基础上,在查询key时,需要加上这里配置的前缀,
* 如 metrics.reporter.foo. ,这样就可以获取特定reporter的配置
*/
DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
configuration,
ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
4.1.1 ScopeFormat
上述的ScopeFormats也是配置对象中获取的,如下:
public static ScopeFormats fromConfig(Configuration config)
String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
这里就需要介绍ScopeFormat,其类继承关系如下:
从图中可以看出,Flink中的每个组件,都有对应的格式。
首先看下ScopeFormat中的主要属性对象:
/** 这是原生格式,比如 <host>.jobmanager ,如果为空,则是 <empty> */
private final String format;
/** format按照分割符分割后的数组,如 template= "<host>", "jobmanager”,被<>包裹的元素,是变量元素 */
private final String[] template;
/** 这是template数组中,变量元素的索引,如"<host>"是变量,在template中的索引是0,则 templatePos = 0 */
private final int[] templatePos;
/** 这个是template中变量元素对应的真实的值,在values数组中的位置,详见 构造函数 和 #bindVariables方法 */
private final int[] valuePos;
这里以JobManagerScopeFormat为例进行分析说明,在ScopeFormats中,默认传给JobManagerScopeFormat的构造函数的入参值是 .jobmanager 。
则JobManagerScopeFormat的构造过程如下:
/** format的默认值是 <host>.jobmanager */
public JobManagerScopeFormat(String format)
super(format, null, new String[]
SCOPE_HOST
);
接着看起父类ScopeFormat的构造过程:
/** 接上面,入参值为 format="<host>.jobmanager" ,parent=null , variables="<host>" */
protected ScopeFormat(String format, ScopeFormat parent, String[] variables)
checkNotNull(format, "format is null");
/** 将format这个字符串分割, rawComponents = "<host>", "jobmanager" */
final String[] rawComponents = format.split("\\\\" + SCOPE_SEPARATOR);
/** 根据rawComponents的第一个元素是为"*",来判断是否要继承父组的范围 */
final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT);
if (parentAsPrefix)
/** 需要继承父组的范围,而父组有是null,则抛出异常 */
if (parent == null)
throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '"
+ SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component).");
/** 如果以 "*." 开头,则format至少需要有3个字符,否则就是无效字符,设置为 "<empty>" */
this.format = format.length() > 2 ? format.substring(2) : "<empty>";
String[] parentTemplate = parent.template;
int parentLen = parentTemplate.length;
/** 将父组的范围和自身的范围,合并到一起 */
this.template = new String[parentLen + rawComponents.length - 1];
System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
else
/** 不需要继承父组的范围,则直接赋值,format="<host>.jobmanager",template="<host>", "jobmanager" */
this.format = format.isEmpty() ? "<empty>" : format;
this.template = rawComponents;
/** 将 variables="<host>" 转换为map "<host>" -> 0 */
HashMap<String, Integer> varToValuePos = arrayToMap(variables);
List<Integer> templatePos = new ArrayList<>();
List<Integer> valuePos = new ArrayList<>();
for (int i = 0; i < template.length; i++)
final String component = template[i];
/** 检查当前这个组件是否是一个变量 */
if (component != null && component.length() >= 3 &&
component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>')
/** 这是一个变量,则从上面的map中,获取其索引 */
Integer replacementPos = varToValuePos.get(component);
if (replacementPos != null)
templatePos.add(i);
valuePos.add(replacementPos);
this.templatePos = integerListToArray(templatePos);
this.valuePos = integerListToArray(valuePos);
经过这个构造过程,ScopeFormat中的四个属性的值如下:
format = “.jobmanager”
template = “”, “jobmanager”
templatePos = 0
valuePos = 0
对于JobManagerScopeFormat来说,构建一个具体的范围数组的逻辑如下:
public String[] formatScope(String hostname)
/** 获取template数组的一份拷贝,深拷贝 */
final String[] template = copyTemplate();
final String[] values = hostname ;
/** 使用hostname替换掉template中索引为0的元素<host> */
return bindVariables(template, values);
protected final String[] copyTemplate()
String[] copy = new String[template.length];
System.arraycopy(template, 0, copy, 0, template.length);
return copy;
/** 在结合这个逻辑,就知道ScopeFormat中的属性valuePos的作用了 */
protected final String[] bindVariables(String[] template, String[] values)
final int len = templatePos.length;
for (int i = 0; i < len; i++)
template[templatePos[i]] = values[valuePos[i]];
return template;
4.2 MetricRegistryImpl
在获取了MetricRegistryConfiguration实例后,在看MetricRegistryImpl的构造函数的实现逻辑。
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
这里给executor这个属性,设置了一个单线程可调度的执行器。
接下来主要看下对MetricReporter相关的初始化工作。
/** 变量配置中配置的reporter的配置 */
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations)
String namedReporter = reporterConfiguration.f0;
/** reporterConfig是Configuration的子类DelegatingConfiguration,会肯定定义的前缀来找key */
Configuration reporterConfig = reporterConfiguration.f1;
/** 获取MetricReporter的具体实现子类的全限定类型,配置的key如:metrics.reporter.foo.class */
final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
if (className == null)
LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
continue;
try
/** 获取配置的定期执行的时间间隔,key的格式如:metrics.reporter.foo.interval */
String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null)
try
String[] interval = configuredPeriod.split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
catch <以上是关于FlinkFlink 指标监测 相关源码的主要内容,如果未能解决你的问题,请参考以下文章