FlinkFlink源码分析——批处理模式JobGraph的创建

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink源码分析——批处理模式JobGraph的创建相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink源码分析——批处理模式JobGraph的创建 仅供自己学习。

Flink不管是流处理还是批处理都是将我们的程序编译成JobGraph进行提交的,之前我们分析过流处理模式下的JobGraph创建,现在我们来分析一下批处理模式下的JobGraph创建。

本文以本地模式为例,分析JobGraph的创建

我们仍然以WordCount为例子来分析JobGraph的创建过程,WordCount代码

 val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
//    env.getConfig.setExecutionMode(ExecutionMode.BATCH)

    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?",
      "li wen tao li wen tao li wen tao"
    )

    text.flatMap  _.toLowerCase.split("\\\\W+").filter _.nonEmpty  
      .map  (_, 1) 
      .groupBy(0)
      .sum(1)
      .writeAsText("D:\\\\IDEASPARK\\\\flink\\\\wordcount", WriteMode.OVERWRITE)

    env.execute()

这个WordCount执行之后生成的DataSet关系图如下所示:

DataSource ——> FlatMapOperator ——> MapOperator ——> ScalaAggregateOperator ——> DataSink

注意这里的Operator并非是指算子层面的operator,而是在数据集层面的operator,这些operator也还是DataSet的子类型(DataSink除外)

首先看一下执行入口,在本地模式下,会执行LocalEnvironment.execute()方法,先创建执行计划Plan,再开始执行这个计划

//LocalEnvironment
public JobExecutionResult execute(String jobName) throws Exception 
   if (executor == null) 
      startNewSession();
   

   Plan p = createProgramPlan(jobName);

   // Session management is disabled, revert this commit to enable
   //p.setJobId(jobID);
   //p.setSessionTimeout(sessionTimeout);

   JobExecutionResult result = executor.executePlan(p);

   this.lastJobExecutionResult = result;
   return result;

这个执行计划Plan很简单,里面只包含了一些sinks,先创建执行计划的过程就是将WordCount代码中创建的每个DataSet转换成对应算子层面的operator。

2.创建执行计划Plan

首先我们来看看createProgramPlan()源码实现

//ExecutionEnvironment
public Plan createProgramPlan(String jobName, boolean clearSinks) 
   ...
   //创建一个translator转换器,从sink开始转换
   OperatorTranslation translator = new OperatorTranslation();
   Plan plan = translator.translateToPlan(this.sinks, jobName);

   ...

   return plan;

//OperatorTranslation
public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) 
   List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
   //从sink开始进行向上的深度优先遍历
   for (DataSink<?> sink : sinks) 
      planSinks.add(translate(sink));
   

   Plan p = new Plan(planSinks);
   p.setJobName(jobName);
   return p;


private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) 

   // translate the input recursively
   //从sink开始递归的向上去进行转换
   Operator<T> input = translate(sink.getDataSet());

   // translate the sink itself and connect it to the input
   GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);

   translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());

   return translatedSink;


private <T> Operator<T> translate(DataSet<T> dataSet) 
   while (dataSet instanceof NoOpOperator) 
      dataSet = ((NoOpOperator<T>) dataSet).getInput();
   

   // check if we have already translated that data set (operation or source)
   Operator<?> previous = this.translated.get(dataSet);
   if (previous != null) 
      ... //已经转换过了
   

   Operator<T> dataFlowOp;

   if (dataSet instanceof DataSource) 
      DataSource<T> dataSource = (DataSource<T>) dataSet;
      dataFlowOp = dataSource.translateToDataFlow();
      dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
   
   else if (dataSet instanceof SingleInputOperator) 
      SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
      dataFlowOp = translateSingleInputOperator(singleInputOperator);
      dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
   
   else if (dataSet instanceof TwoInputOperator) 
      TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
      dataFlowOp = translateTwoInputOperator(twoInputOperator);
      dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
   else if
   ...

   this.translated.put(dataSet, dataFlowOp);

   // take care of broadcast variables
   translateBcVariables(dataSet, dataFlowOp);

   return dataFlowOp;


private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) 

   @SuppressWarnings("unchecked")
   SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;

   @SuppressWarnings("unchecked")
   DataSet<I> typedInput = (DataSet<I>) op.getInput();
    //在遇到SingleInputOperator节点是继续向上递归,那么整个的递归过程就是从sink后续遍历,先转换source,再依次向下进行转换
   Operator<I> input = translate(typedInput);

   org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);

   ...

   return dataFlowOp;

大致实现就是从sink开始进行向上递归的转换,整个的递归过程就是从sink进行深度优化遍历,先转换source,再依次向下进行转换,转换的方法就是调用每个DataSet(或者DataSink)的translateToDataFlow()方法,将DataSet转换成算子层面的Operator,然后将上一级转换后的Operator当做输入input。

下面看一下每种DataSet(或DataSink)的translateToDataFlow()方法

//
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() 
   String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
   if (name.length() > 150) 
      name = name.substring(0, 150);
   

   @SuppressWarnings("unchecked", "rawtypes")
   GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
      new OperatorInformation<OUT>(getType()), name);
   source.setParallelism(parallelism);
   if (this.parameters != null) 
      source.getParameters().addAll(this.parameters);
   
   if (this.splitDataProperties != null) 
      source.setSplitDataProperties(this.splitDataProperties);
   
   return source;


//MapOperator
protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) 

   String name = getName() != null ? getName() : "Map at " + defaultName;
   // create operator
   MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
         new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
   // set input
   po.setInput(input);
   // set parallelism
   if (this.getParallelism() > 0) 
      // use specified parallelism
      po.setParallelism(this.getParallelism());
    else 
      // if no parallelism has been specified, use parallelism of input operator to enable chaining
      po.setParallelism(input.getParallelism());
   

   return po;


//ScalaAggregateOperator
protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) 

   // sanity check
   if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size()) 
      throw new IllegalStateException();
   

   // construct the aggregation function
   AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
   int[] fields = new int[this.fields.size()];
   StringBuilder genName = new StringBuilder();

   for (int i = 0; i < fields.length; i++) 
      aggFunctions[i] = (AggregationFunction<Object>) this.aggregationFunctions.get(i);
      fields[i] = this.fields.get(i);

      genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
   
   genName.setLength(genName.length() - 1);

   @SuppressWarnings("rawtypes")
   RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType(), aggFunctions, fields);

   String name = getName() != null ? getName() : genName.toString();

   // distinguish between grouped reduce and non-grouped reduce
   //这种是针对未分组的reduce
   if (this.grouping == null) 
      // non grouped aggregation
      UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
      GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
            new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);

      po.setCombinable(true);

      // set input
      po.setInput(input);
      // set parallelism
      po.setParallelism(this.getParallelism());

      return po;
   
   //这种是针对的是分组的reduce,我们的WordCount代码走这里
   if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) 
      // grouped aggregation
      int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions();
      UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
      GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
            new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
      //默认就开启combiner了,数据预先进行聚合,减少数据传输
      po.setCombinable(true);

      // set input
      po.setInput(input);
      // set parallelism
      po.setParallelism(this.getParallelism());

      SingleInputSemanticProperties props = new SingleInputSemanticProperties();

      for (int keyField : logicalKeyPositions) 
         boolean keyFieldUsedInAgg = false;
         for (int aggField : fields) 
            if (keyField == aggField) 
               keyFieldUsedInAgg = true;
               break;
            
         

         if (!keyFieldUsedInAgg) 
            props.addForwardedField(keyField, keyField);
         
      

      po.setSemanticProperties(props);
      po.setCustomPartitioner(grouping.getCustomPartitioner());

      return po;
   
   else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys) 
      throw new UnsupportedOperationException("Aggregate does not support grouping with KeySelector functions, yet.");
   flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

FlinkFlink 源码之 安全认证

flinkFlink 1.12.2 源码浅析 :Task数据输出

FlinkFlink 1.13 版本 SavePoint 源码分析