FlinkMetrics运作机制

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkMetrics运作机制相关的知识,希望对你有一定的参考价值。

1.概述

转载并且补充:Flink源码剖析:Metrics运作机制

1. Metrics简介

1.1 什么是 Metrics?

Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的 Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况。

1.2 Metric Types

Metrics 的类型如下:

  • Counter,对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
  • Gauge,Gauge 是最简单的 Metrics,它反映一个值的大小。
  • Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。理解为滚动窗口求平均值即可。
  • Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。

1.3 Metric Group

Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。

Metric Group 的层级有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每个 Job 具体到某一个 task 的 group,task 又分为 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 统计和一些 Metrics,整个层级大概如下图所示。Metrics 不会影响系统,它处在不同的组中,并且 Flink支持自己去加 Group,可以有自己的层级。

TaskManagerMetricGroup
 •TaskManagerJobMetricGroup
  •TaskMetricGroup
   •TaskIOMetricGroup
   •OperatorMetricGroup
    •$User-defined Group / $User-defined Metrics
    •OperatorIOMetricGroup
 •JobManagerMetricGroup
  •JobManagerJobMetricGroup

JobManagerMetricGroup 相对简单,相当于 Master,它的层级也相对较少。

2. Metrics运行机制

系统或自定义的Metrics如何存入第三方存储呢?一般由两种模式,推(push)和拉(poll)。接下来,我们通过分析PrometheusReporterPrometheusPushGatewayReporter两种Reporter,来深入理解Metrics的运行机制和push和pull的区别。

2.1 初始化Reporter

做一些与第三方存储相关的初始化工作

2.1.1 PrometheusReporter

启动本地Rest服务器,Prometheus与其通信,并pull metrics

public class PrometheusReporter extends AbstractPrometheusReporter 
	// 初始化一些与第三方存储相关的工作
  @Override
	public void open(MetricConfig config) 
		super.open(config);

		String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
		Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);

		while (ports.hasNext()) 
			int port = ports.next();
			try 
				// internally accesses CollectorRegistry.defaultRegistry
                // 针对PrometheusReporter,启动一个本地Rest服务器,以供Prometheus拉取数据;
                // 而如果是PrometheusPushGatewayReporter,这里就是根据host+port新建一个PushGateway客户端,来往PushGateway发送数据
				httpServer = new HTTPServer(port);
				this.port = port;
				log.info("Started PrometheusReporter HTTP server on port .", port);
				break;
			 catch (IOException ioe)  //assume port conflict
				log.debug("Could not start PrometheusReporter HTTP server on port .", port, ioe);
			
		
		if (httpServer == null) 
			throw new RuntimeException("Could not start PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig);
		
	


2.1.2 PrometheusPushGatewayReporter

新建一个PushGateway客户端,主动向PushGateway push metrics

public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled 
	@Override
	public void open(MetricConfig config) 
		super.open(config);

		String host = config.getString(HOST.key(), HOST.defaultValue());
		int port = config.getInteger(PORT.key(), PORT.defaultValue());
		String configuredJobName = config.getString(JOB_NAME.key(), JOB_NAME.defaultValue());
		boolean randomSuffix = config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), RANDOM_JOB_NAME_SUFFIX.defaultValue());
		deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), DELETE_ON_SHUTDOWN.defaultValue());

		if (host == null || host.isEmpty() || port < 1) 
			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
		

		if (randomSuffix) 
			this.jobName = configuredJobName + new AbstractID();
		 else 
			this.jobName = configuredJobName;
		
        // 新建一个PushGateway客户端,用于向PushGateway推送Metrics
		pushGateway = new PushGateway(host + ':' + port);
		log.info("Configured PrometheusPushGatewayReporter with host:, port:, jobName: , randomJobNameSuffix:, deleteOnShutdown:", host, port, jobName, randomSuffix, deleteOnShutdown);
	


2.2 注册Reporter

向MetricRegistry对象注册已配置的所有Reporter,以列表的形式维护

public class MetricRegistryImpl implements MetricRegistry 	
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) 
		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
		this.scopeFormats = config.getScopeFormats();
		this.globalDelimiter = config.getDelimiter();
		this.delimiters = new ArrayList<>(10);
		this.terminationFuture = new CompletableFuture<>();
		this.isShutdown = false;

		// second, instantiate any custom configured reporters
		this.reporters = new ArrayList<>(4);
		// 周期调度器,用与汇报型的reporter周期向外界数据源周期汇报metrics
		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

		this.queryService = null;
		this.metricQueryServiceRpcService = null;

		if (reporterConfigurations.isEmpty()) 
			// no reporters defined
			// by default, don't report anything
			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
		 else 
			// 根据flink-conf.yaml中配置的reporter类型,获得所有reporter
			for (ReporterSetup reporterSetup : reporterConfigurations) 
				final String namedReporter = reporterSetup.getName();

				try 
					Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
					TimeUnit timeunit = TimeUnit.SECONDS;
					long period = 10;

					if (configuredPeriod.isPresent()) 
						try 
							String[] interval = configuredPeriod.get().split(" ");
							period = Long.parseLong(interval[0]);
							timeunit = TimeUnit.valueOf(interval[1]);
						
						catch (Exception e) 
							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
									"Using default reporting interval.");
						
					
					// reporter实例
					final MetricReporter reporterInstance = reporterSetup.getReporter();
					final String className = reporterInstance.getClass().getName();
					// 汇报型reporter,例如PrometheusPushGatewayReporter
					if (reporterInstance instanceof Scheduled) 
						LOG.info("Periodically reporting metrics in intervals of   for reporter  of type .", period, timeunit.name(), namedReporter, className);
						// 周期向外界数据源汇报metrics
						executor.scheduleWithFixedDelay(
								new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
					 else 
						LOG.info("Reporting metrics for reporter  of type .", namedReporter, className);
					
          			// 注册当前reporter到MetricRegistry中,这里MetricRegistry可以理解为注册中心
					reporters.add(reporterInstance);

					String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter));
					if (delimiterForReporter.length() != 1) 
						LOG.warn("Failed to parse delimiter '' for reporter '', using global delimiter ''.", delimiterForReporter, namedReporter, globalDelimiter);
						delimiterForReporter = String.valueOf(globalDelimiter);
					
					this.delimiters.add(delimiterForReporter.charAt(0));
				
				catch (Throwable t) 
					LOG.error("Could not instantiate metrics reporter . Metrics might not be exposed/reported.", namedReporter, t);
				
			
		
	


2.3 收集Metrics到内存

当启动flink任务时,向MetricRegistry注册metrics

// MetricRegistryImpl类	
public class MetricRegistryImpl implements MetricRegistry 
    // Registers a new @link Metric with this registry.
	@Override
	public void register(Metric metric, String metricName, AbstractMetricGroup group) 
		for (int i = 0; i < reporters.size(); i++) 
			MetricReporter reporter = reporters.get(i);
			try 
				if (reporter != null) 
					FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
					// 注册metrics,做了以下操作:
					// 1. 将metric注册到CollectorRegistry.defaultRegistry这个静态类
					// 2. 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
					reporter.notifyOfAddedMetric(metric, metricName, front);
					
				 catch (Exception e) 
							LOG.warn("Error while registering metric.", e);
				
		
	


public abstract class AbstractPrometheusReporter implements MetricReporter 
	private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) 
		if (metric instanceof Gauge) 
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
		 else if (metric instanceof Counter) 
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
		 else if (metric instanceof Meter) 
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
		 else if (metric instanceof Histogram) 
			((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
		 else 
			log.warn("Cannot add unknown metric type: . This indicates that the metric type is not supported by this reporter.",
				metric.getClass().getName());
		
	


	public abstract class SimpleCollector<Child> extends Collector 
    	public <T extends Collector> T setChild(Child child, String... labelValues) 
        	if (labelValues.length != this.labelNames.size()) 
            	throw new IllegalArgumentException("Incorrect number of labels.");
        	 else 
        		// 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
            	this.children.put(Arrays.asList(labelValues), child);
            	return this;
        
    


2.4 发送Metrics到第三方存储

  • PrometheusReporter
    Metrics被存储到Map构成的容器中,就完成了使命,等待Prometheus定时来pull Metrics即可,这是pull的模式

  • PrometheusPushgatewayReporter:Flink中的reporter分为两种,一种是实现了Scheduled接口的,一种是没有实现Scheduled接口的;如果实现了Scheduled则表明,该reporter会周期性(通过周期调度线程池实现)地向第三方存储发送Metrics,即推的模式。

请参考以下源码中的中文注释:

public class MetricRegistryImpl implements MetricRegistry 
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) 
		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
		this.scopeFormats = config.getScopeFormats();
		this.globalDelimiter = config.getDelimiter();
		this.delimiters = new ArrayList<>(10);
		this.terminationFuture = new CompletableFuture<>();
		this.isShutdown = false;

		// second, instantiate any custom configured reporters
		this.reporters = new ArrayList<>(4);

		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

		this.queryService = null;
		this.metricQueryServiceRpcService = null;

		if (reporterConfigurations.isEmpty()) 
			// no reporters defined
			// by default, don't report anything
			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
		 else 
			for (ReporterSetup reporterSetup : reporterConfigurations) 
				final String namedReporter = reporterSetup.getName();

				try 
					Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
					TimeUnit timeunit = TimeUnit.SECONDS;
					long period = 10;

					if (configuredPeriod.isPresent()) 
						try 
							String[] interval = configuredPeriod.get().split(" ");
							period = Long.parseLong(interval[0]);
							timeunit = TimeUnit.valueOf(interval[1]);
						
						catch (Exception e) 
							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
									"Using default reporting interval.");
						
					

					final MetricReporter reporterInstance = reporterSetup.getReporter();
					final String className = reporterInstance.getClass().getName();
                    // 如果实现了Scheduled,则表明reporter会周期性向第三方存储汇报Metrics
					if (reporterInstance instanceof Scheduled) 
						LOG.info("Periodically reporting metrics in intervals of   for reporter  of type .", period, timeunit.name(), namedReporter, className);
                        // 启动周期汇报Metrics的定时线程
						executor.scheduleWithFixedDelay(
								new MetricRegistryImpl日志服务器的运作机制:

JavaScript小技能:原型链的运作机制Promise链

比特币的原理及运作机制

内部运作机制

区块链原理及核心技术

转载比特币的原理以及运作机制