深入理解Flink ---- Metrics的内部结构
Posted WangTuo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Flink ---- Metrics的内部结构相关的知识,希望对你有一定的参考价值。
从Metrics的使用说起
Flink的Metrics种类有四种Counters
, Gauges
, Histograms和
Meters
.
如何使用Metrics呢? 以Counter为例,
1 public class MyMapper extends RichMapFunction<String, String> { 2 private transient Counter counter; 3 4 @Override 5 public void open(Configuration config) { 6 this.counter = getRuntimeContext() 7 .getMetricGroup() 8 .counter("myCounter"); 9 } 10 11 @Override 12 public String map(String value) throws Exception { 13 this.counter.inc(); 14 return value; 15 } 16 }
行7 getMetricGroup()获取MetricGroup
行8 从MetricGroup中获取Metric实例
那么,我们来探访一下MetricGroup
Metric容器--MetricGroup
MetricGroup是Metric对象和metric subgroups的容器.
调用以下4个方法可以获得Metric对象并调用addMetric()注册这个Metric.
(AbstractMetricGroup.java)
1 public <C extends Counter> C counter(String name, C counter) 2 { 3 addMetric(name, counter); 4 return counter; 5 } 6 7 public <T, G extends Gauge<T>> G gauge(String name, G gauge) { 8 addMetric(name, gauge); 9 return gauge; 10 } 11 12 public <H extends Histogram> H histogram(String name, H histogram) { 13 addMetric(name, histogram); 14 return histogram; 15 } 16 17 public <M extends Meter> M meter(String name, M meter) { 18 addMetric(name, meter); 19 return meter; 20 }
注,MetricGroup接口的另一个实现UnregisteredMetricsGroup仅仅返回Metric实例而不对Metric进行注册
注2,MetricGroup接口的第三个实现ProxyMetricGroup有一个parent MetricGroup,ProxyMetricGroup所有的调用都转发到parentMetricGroup上
(AbstractMetricGroup.java)重要的域
/** The registry that this metrics group belongs to. */ protected final MetricRegistry registry; /** All metrics that are directly contained in this group. */ private final Map<String, Metric> metrics = new HashMap<>(); /** All metric subgroups of this group. */ private final Map<String, AbstractMetricGroup> groups = new HashMap<>();/** Flag indicating whether this group has been closed. */ private volatile boolean closed;
(AbstractMetricGroup.java)
1 protected void addMetric(String name, Metric metric) { 2 if (metric == null) { 3 LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name); 4 return; 5 } 6 // add the metric only if the group is still open 7 synchronized (this) { 8 if (!closed) { 9 // immediately put without a ‘contains‘ check to optimize the common case (no collision) 10 // collisions are resolved later 11 Metric prior = metrics.put(name, metric); 12 13 // check for collisions with other metric names 14 if (prior == null) { 15 // no other metric with this name yet 16 17 if (groups.containsKey(name)) { 18 // we warn here, rather than failing, because metrics are tools that should not fail the 19 // program when used incorrectly 20 LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: ‘" + 21 name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); 22 } 23 24 registry.register(metric, name, this); 25 } 26 else { 27 // we had a collision. put back the original value 28 metrics.put(name, prior); 29 30 // we warn here, rather than failing, because metrics are tools that should not fail the 31 // program when used incorrectly 32 LOG.warn("Name collision: Group already contains a Metric with the name ‘" + 33 name + "‘. Metric will not be reported." + Arrays.toString(scopeComponents)); 34 } 35 } 36 } 37 }
具体再来看一下addMetric()的代码
行7 获得互斥锁
行8 检测当前group是否close
行11~34 把要注册的Metric对象添加到metrics map中
这里一个小trick是,默认没有key的冲突,直接把这个metric对象添加到map中.再回头检测是否有值被替换出来.这样的做法可以优化性能(若没有key冲突,减少了一次map寻址)
行24 在MetricRegister中注册Metric,这个在下一节详谈
调用addGroup()可以添加subgroup
(AbstractMetricGroup.java)
1 private AbstractMetricGroup<?> addGroup(String name, ChildType childType) { 2 synchronized (this) { 3 if (!closed) { 4 // adding a group with the same name as a metric creates problems in many reporters/dashboards 5 // we warn here, rather than failing, because metrics are tools that should not fail the 6 // program when used incorrectly 7 if (metrics.containsKey(name)) { 8 LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: ‘" + 9 name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); 10 } 11 12 AbstractMetricGroup newGroup = createChildGroup(name, childType); 13 AbstractMetricGroup prior = groups.put(name, newGroup); 14 if (prior == null) { 15 // no prior group with that name 16 return newGroup; 17 } else { 18 // had a prior group with that name, add the prior group back 19 groups.put(name, prior); 20 return prior; 21 } 22 } 23 else { 24 // return a non-registered group that is immediately closed already 25 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name); 26 closedGroup.close(); 27 return closedGroup; 28 } 29 } 30 } 31 32 protected GenericMetricGroup createChildGroup(String name, ChildType childType) { 33 switch (childType) { 34 case KEY: 35 return new GenericKeyMetricGroup(registry, this, name); 36 default: 37 return new GenericMetricGroup(registry, this, name); 38 } 39 } 40 41 /** 42 * Enum for indicating which child group should be created. 43 * `KEY` is used to create {@link GenericKeyMetricGroup}. 44 * `VALUE` is used to create {@link GenericValueMetricGroup}. 45 * `GENERIC` is used to create {@link GenericMetricGroup}. 46 */ 47 protected enum ChildType { 48 KEY, 49 VALUE, 50 GENERIC 51 }
行2 获取互斥锁
行12~21 新建MetricGroup对象
注意,添加的subgroup的name与Metric对象的name相同会造成问题.
行25,35,37 同一个tree里的MetricGroup对象使用同一个MetricRegister
行26 close MetricGroup
1 public void close() { 2 synchronized (this) { 3 if (!closed) { 4 closed = true; 5 6 // close all subgroups 7 for (AbstractMetricGroup group : groups.values()) { 8 group.close(); 9 } 10 groups.clear(); 11 12 // un-register all directly contained metrics 13 for (Map.Entry<String, Metric> metric : metrics.entrySet()) { 14 registry.unregister(metric.getValue(), metric.getKey(), this); 15 } 16 metrics.clear(); 17 } 18 } 19 }
行2 获取互斥锁
递归地close所有subgroups, 注销所有metrics
MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要获取互斥锁
原因: 防止关闭group的同时添加metrics和subgroups造成的资源泄露.
MetricGroup另一个很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).
作用是获取某个Metric的唯一名作为标志(identifier).
identifier分为3部分:System scope, User scope, Metric name
A.B.C 其中,A为System scope,B为User scope,C为Metric name, ‘.‘是分隔符
System Scope可在conf/flink-conf.yaml中定义.
User Scope就是groups tree, 可调用addGroup(String)来定义 (可定义多层group)
MetricGroup与MetricReporter之间的桥梁 -- MetricRegister
MetricRegistry追踪所有已注册的Metric.它作为MetricGroup和MetricReporter之间的桥梁.
在MetricGroup的addMetric()方法中调用了MetricRegister的register()方法:
registry.register(metric, name, this);
在MetricGroup的close()方法中调用了MetricRegister的unregister()方法:
registry.unregister(metric.getValue(), metric.getKey(), this);
1 // ------------------------------------------------------------------------ 2 // Metrics (de)registration 3 // ------------------------------------------------------------------------ 4 5 @Override 6 public void register(Metric metric, String metricName, AbstractMetricGroup group) { 7 synchronized (lock) { 8 if (isShutdown()) { 9 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down."); 10 } else { 11 if (reporters != null) { 12 for (int i = 0; i < reporters.size(); i++) { 13 MetricReporter reporter = reporters.get(i); 14 try { 15 if (reporter != null) { 16 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); 17 reporter.notifyOfAddedMetric(metric, metricName, front); 18 } 19 } catch (Exception e) { 20 LOG.warn("Error while registering metric.", e); 21 } 22 } 23 } 24 try { 25 if (queryService != null) { 26 MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); 27 } 28 } catch (Exception e) { 29 LOG.warn("Error while registering metric.", e); 30 } 31 try { 32 if (metric instanceof View) { 33 if (viewUpdater == null) { 34 viewUpdater = new ViewUpdater(executor); 35 } 36 viewUpdater.notifyOfAddedView((View) metric); 37 } 38 } catch (Exception e) { 39 LOG.warn("Error while registering metric.", e); 40 } 41 } 42 } 43 } 44 45 @Override 46 public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { 47 synchronized (lock) { 48 if (isShutdown()) { 49 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down."); 50 } else { 51 if (reporters != null) { 52 for (int i = 0; i < reporters.size(); i++) { 53 try { 54 MetricReporter reporter = reporters.get(i); 55 if (reporter != null) { 56 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); 57 reporter.notifyOfRemovedMetric(metric, metricName, front); 58 } 59 } catch (Exception e) { 60 LOG.warn("Error while registering metric.", e); 61 } 62 } 63 } 64 try { 65 if (queryService != null) { 66 MetricQueryService.notifyOfRemovedMetric(queryService, metric); 67 } 68 } catch (Exception e) { 69 LOG.warn("Error while registering metric.", e); 70 } 71 try { 72 if (metric instanceof View) { 73 if (viewUpdater != null) { 74 viewUpdater.notifyOfRemovedView((View) metric); 75 } 76 } 77 } catch (Exception e) { 78 LOG.warn("Error while registering metric.", e); 79 } 80 } 81 } 82 }
register()方法和unregister()方法基本相似
行7 获取同步锁. 锁对象不再是this,而是new Object().这样做,方便拓展第二个锁.
行11~23 向所有下属的MetricReporter添加该Metric
行24~30 向MetricQueryService添加该Metric
MetricQueryService是个actor,它会将Metric序列化,然后写入到output stream
行31~40 如果Metric实现了View接口,那么在viewUpdater中注册这个Metric
Metric类实现View接口后,可以按设定时间间隔来更新这个Metric(由viewUpdater来执行update)
MetricReporter
MetricReporter用于把Metric导出到外部backend.
外部backend的参数可在conf/flink-conf.yaml中设定.
可同时设定多个外部backend.
MetricReporter接口
1 public interface MetricReporter { 2 3 // ------------------------------------------------------------------------ 4 // life cycle 5 // ------------------------------------------------------------------------ 6 8 void open(MetricConfig config); // 9 10 void close(); 11 12 // ------------------------------------------------------------------------ 13 // adding / removing metrics 14 // ------------------------------------------------------------------------ 15 16 void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); 17 18 19 void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group); 20 }
行8 配置这个Reporter.
由于reporter的构造器是无参的,这个方法用于初始化reporter的域.
这个方法总是在对象构造后调用
行10 关闭这个Reporter.
应该在这个方法中关闭 channels,streams以及释放资源.
行16,19 增删metrics
常规的reporter类还需要实现Scheduled接口用于报告当前的measurements
1 public interface Scheduled { 2 3 void report(); 4 }
行3 由metric registry定期地调用report()方法,来报告当前的measurements
以上是关于深入理解Flink ---- Metrics的内部结构的主要内容,如果未能解决你的问题,请参考以下文章
深入理解Flink ---- End-to-End Exactly-Once语义
Flink 监控系列Flink 自定义 kafka metrics reporter 上报 metrics 到 kafka
Flink 监控系列Flink 自定义 kafka metrics reporter 上报 metrics 到 kafka