FlinkFlink 指标监测 相关源码

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 指标监测 相关源码相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink源码系列——指标监测 仅供自己学习,建议去看原文。

1、Metric简介
Flink对于指标监测有一套自己的实现,指标的统计方式有四种,这些指标都实现了Metric这个接口,而Metric这个接口只是一个标识,本身并没有定义如何方法接口,部分子类的继承关系如下所示。

从图中可以看出,Metric这个接口有四个直接子类,分别是:

Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
Counter —— 计数器,在一些情况下,会比Gauge高效,比如通过一个AtomicLong变量来统计一个队列的长度;
Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如TPSHistogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。

以MeterView为例,分析一个Metric的具体实现。MeterView还实现View接口,实现View接口的类,表示其会定期的执行update方法,进行数据的更新。

public class MeterView implements Meter, View 
   /** 底层使用的计算器 */
   private final Counter counter;
   /** 计算平均值的事件跨度 */
   private final int timeSpanInSeconds;
   /** 包含历史数据的循环数组 */
   private final long[] values;
   /** 当前时间在数组中的索引 */
   private int time = 0;
   /** 最新计算的rate */
   private double currentRate = 0;

   public MeterView(int timeSpanInSeconds) 
      this(new SimpleCounter(), timeSpanInSeconds);
   

   public MeterView(Counter counter, int timeSpanInSeconds) 
      this.counter = counter;
      /** 这里的操作是为了让时间跨度刚好是 UPDATE_INTERVAL_SECONDS 的整数倍 */
      this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS);
      this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1];
   

   @Override
   public void markEvent() 
      this.counter.inc();
   

   @Override
   public void markEvent(long n) 
      this.counter.inc(n);
   

   @Override
   public long getCount() 
      return counter.getCount();
   

   @Override
   public double getRate() 
      return currentRate;
   

   @Override
   public void update() 
      time = (time + 1) % values.length;
      values[time] = counter.getCount();
      currentRate =  ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
   

从类的属性变量中可以看出,MeterView是在一个Counter计数器的基础之上,封装了一层,从而实现事件每秒的平均速率。以values这个长整型的数组,作为环形数组,实现对最新的历史数据的保存。
在构造函数中,会对入参timeSpanInSeconds这个时间跨度进行修正,使其刚好是UPDATE_INTERVAL_SECONDS的整数倍,另外values数组的长度是timeSpanInSeconds对UPDATE_INTERVAL_SECONDS倍数,再加上1,这样这个数组的最新数据和最老的数据之间的时间间隔就刚好是timeSpanInSeconds。
假设values数组的长度为n,则:

1、索引n-1处的统计值,和索引0处的统计值,时间间隔就是timeSpanInSeconds;
2、由于是环形数组,所以索引0处的统计值,和索引1处的统计值的时间间隔就是timeSpanInSeconds;
3、所以索引i处的统计值,和索引(i+1)%n处的统计值,时间间隔是timeSpanInSeconds;



这个逻辑理清楚了,对update方法的逻辑也就清楚了。

另外,对于Metrics相关概念,可以参考 http://wuchong.me/blog/2015/08/01/getting-started-with-metrics/

2、MetricGroup
为了便于对Metric进行方便的管理和区分,可以对Metric进行分组,MetricGroup就是用来实现这个功能的。
MetricGroup的相关子类的继承关系如下所示。

1ProxyMetricGroup —— 这是一个代理类,就是把新Metric或者新的子MetricGroup的注册,委托给代理MetricGroup进行处理;
2AbstractMetricGroup —— 对新增Metric和子MetricGroup的相关方法进行了实现;



在AbstractMetricGroup中有这些属性

protected final A parent;
private final Map<String, Metric> metrics = new HashMap<>();
private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
parent —— 用来保存这个MetricGroup的父MetricGroup
metrics —— 这个map,是用来保存当前MetricGroup中注册的Metric;
groups —— 这个map,是用来保存当前MetricGroup中注册子MetricGroup

通过这个数据结构可以看出,在MetricGroup中,可以建立一个树状的结构,用来存储和归类相关的Metric。

3、MetricReporter
MetricReporter是用来向外披露Metric的监测结果的接口。
由于MetricReporter的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共,无参的构造函数,这个接口的定义如下:

public interface MetricReporter 
   void open(MetricConfig config);
   void close();
   void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
   void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

open —— 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作;
close —— 这里就是在关闭时,进行资源回收等相关操作的;
notifyOfAddedMetric —— 当有一个新的Metric注册时,会调用该方法来通知MetricReporter;
notifyOfRemovedMetric —— 当有一个Metric被移除时,通过这个方法来通知MetricReporter

4、MetricRegistry
MetricGroup是用来对Metric进行分组管理,MetricReporter是用来对外披露Metric,而MetricRegistry就是这两者之间的桥梁,通过MetricRegistry,就可以让MetricReporter感知到在MetricGroup中的Metric发生的变化情况。
对于MetricRegistry这个接口,其实现为MetricRegistryImpl,而其在实例化时,构造函数的入参是一个MetricRegistryConfiguration实例。

4.1、MetricRegistryConfiguration
MetricRegistryConfiguration顾名思义,就是MetricRegistry的相关配置参数,主要有三个属性,如下:

/** flink中不同组件的范围格式 */
private final ScopeFormats scopeFormats;

/** 字符串的分隔符,这是一个全局的分隔符 */
private final char delimiter;

/** 配置中每个reporter的名称和其对应的配置对象的列表 */
private final List<Tuple2<String, Configuration>> reporterConfigurations;

这些属性,都是从配置参数中获取而来,逻辑如下:

public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) 
   /** 获取scopeFormats */
   ScopeFormats scopeFormats;
   try 
      scopeFormats = ScopeFormats.fromConfig(configuration);
    catch (Exception e) 
      LOG.warn("Failed to parse scope format, using default scope formats", e);
      scopeFormats = ScopeFormats.fromConfig(new Configuration());
   

   /** 获取分隔符 */
   char delim;
   try 
      delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
    catch (Exception e) 
      LOG.warn("Failed to parse delimiter, using default delimiter.", e);
      delim = '.';
   

   /** 获取MetricReporter相关的配置信息,MetricReporter的配置格式是 metrics.reporters = foo, bar */
   final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
   List<Tuple2<String, Configuration>> reporterConfigurations;

   if (definedReporters == null) 
      /** 如果没有配置,则返回空集合 */
      reporterConfigurations = Collections.emptyList();
    else 
      /** 按模式匹配分割,如上述的配置,则namedReporters="foo", "bar" */
      String[] namedReporters = splitPattern.split(definedReporters);

      reporterConfigurations = new ArrayList<>(namedReporters.length);

      for (String namedReporter: namedReporters) 
         /** 
          * 这里是获取一个代理配置对象,就是在原来配置对象的基础上,在查询key时,需要加上这里配置的前缀,
          * 如 metrics.reporter.foo. ,这样就可以获取特定reporter的配置 
          */
         DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
            configuration,
            ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');

         reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
      
   

   return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);

4.1.1 ScopeFormat
上述的ScopeFormats也是配置对象中获取的,如下:

public static ScopeFormats fromConfig(Configuration config) 
   String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
   String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
   String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
   String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
   String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
   String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);

   return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);

这里就需要介绍ScopeFormat,其类继承关系如下:

从图中可以看出,Flink中的每个组件,都有对应的格式。
首先看下ScopeFormat中的主要属性对象:

/** 这是原生格式,比如 <host>.jobmanager ,如果为空,则是 <empty> */
private final String format;

/** format按照分割符分割后的数组,如 template= "<host>", "jobmanager”,被<>包裹的元素,是变量元素 */
private final String[] template;

/** 这是template数组中,变量元素的索引,如"<host>"是变量,在template中的索引是0,则 templatePos = 0 */
private final int[] templatePos;

/** 这个是template中变量元素对应的真实的值,在values数组中的位置,详见 构造函数 和 #bindVariables方法 */
private final int[] valuePos;

这里以JobManagerScopeFormat为例进行分析说明,在ScopeFormats中,默认传给JobManagerScopeFormat的构造函数的入参值是 .jobmanager 。
则JobManagerScopeFormat的构造过程如下:

/** format的默认值是 <host>.jobmanager */
public JobManagerScopeFormat(String format) 
   super(format, null, new String[] 
      SCOPE_HOST
   );

接着看起父类ScopeFormat的构造过程:

/** 接上面,入参值为 format="<host>.jobmanager" ,parent=null , variables="<host>" */
protected ScopeFormat(String format, ScopeFormat parent, String[] variables) 
   checkNotNull(format, "format is null");

   /** 将format这个字符串分割, rawComponents = "<host>", "jobmanager" */
   final String[] rawComponents = format.split("\\\\" + SCOPE_SEPARATOR);

   /** 根据rawComponents的第一个元素是为"*",来判断是否要继承父组的范围 */
   final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT);
   if (parentAsPrefix) 
      /** 需要继承父组的范围,而父组有是null,则抛出异常 */
      if (parent == null) 
         throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '"
            + SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component).");
      

      /** 如果以 "*." 开头,则format至少需要有3个字符,否则就是无效字符,设置为 "<empty>" */
      this.format = format.length() > 2 ? format.substring(2) : "<empty>";

      String[] parentTemplate = parent.template;
      int parentLen = parentTemplate.length;

      /** 将父组的范围和自身的范围,合并到一起 */
      this.template = new String[parentLen + rawComponents.length - 1];
      System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
      System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
   
   else 
      /** 不需要继承父组的范围,则直接赋值,format="<host>.jobmanager",template="<host>", "jobmanager" */
      this.format = format.isEmpty() ? "<empty>" : format;
      this.template = rawComponents;
   

   /** 将 variables="<host>" 转换为map "<host>" -> 0 */
   HashMap<String, Integer> varToValuePos = arrayToMap(variables);
   List<Integer> templatePos = new ArrayList<>();
   List<Integer> valuePos = new ArrayList<>();

   for (int i = 0; i < template.length; i++) 
      final String component = template[i];
      /** 检查当前这个组件是否是一个变量 */
      if (component != null && component.length() >= 3 &&
            component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') 
         /** 这是一个变量,则从上面的map中,获取其索引 */
         Integer replacementPos = varToValuePos.get(component);
         if (replacementPos != null) 
            templatePos.add(i);
            valuePos.add(replacementPos);
         
      
   

   this.templatePos = integerListToArray(templatePos);
   this.valuePos = integerListToArray(valuePos);

经过这个构造过程,ScopeFormat中的四个属性的值如下:

format =.jobmanager”
template = “”, “jobmanager”
templatePos = 0
valuePos = 0

对于JobManagerScopeFormat来说,构建一个具体的范围数组的逻辑如下:

public String[] formatScope(String hostname) 
   /** 获取template数组的一份拷贝,深拷贝 */
   final String[] template = copyTemplate();
   final String[] values =  hostname ;
   /** 使用hostname替换掉template中索引为0的元素<host> */
   return bindVariables(template, values);


protected final String[] copyTemplate() 
   String[] copy = new String[template.length];
   System.arraycopy(template, 0, copy, 0, template.length);
   return copy;


/** 在结合这个逻辑,就知道ScopeFormat中的属性valuePos的作用了 */
protected final String[] bindVariables(String[] template, String[] values) 
   final int len = templatePos.length;
   for (int i = 0; i < len; i++) 
      template[templatePos[i]] = values[valuePos[i]];
   
   return template;

4.2 MetricRegistryImpl
在获取了MetricRegistryConfiguration实例后,在看MetricRegistryImpl的构造函数的实现逻辑。

this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));


这里给executor这个属性,设置了一个单线程可调度的执行器。
接下来主要看下对MetricReporter相关的初始化工作。

/** 变量配置中配置的reporter的配置 */
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) 
   String namedReporter = reporterConfiguration.f0;
   /** reporterConfig是Configuration的子类DelegatingConfiguration,会肯定定义的前缀来找key */
   Configuration reporterConfig = reporterConfiguration.f1;

   /** 获取MetricReporter的具体实现子类的全限定类型,配置的key如:metrics.reporter.foo.class */
   final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
   if (className == null) 
      LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
      continue;
   

   try 
      /** 获取配置的定期执行的时间间隔,key的格式如:metrics.reporter.foo.interval */
      String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
      TimeUnit timeunit = TimeUnit.SECONDS;
      long period = 10;

      if (configuredPeriod != null) 
         try 
            String[] interval = configuredPeriod.split(" ");
            period = Long.parseLong(interval[0]);
            timeunit = TimeUnit.valueOf(interval[1]);
         
         catch <

以上是关于FlinkFlink 指标监测 相关源码的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码之 安全认证

FlinkFlink 1.12.2 Task的调度 源码

FLinkFlink 源码阅读笔记- RPC

FlinkFlink 1.13 侧流输出源码解析

股票软件火焰参数如何设置

FlinkFlink 源码之快照