FlinkFlink源码分析——批处理模式JobGraph的创建
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink源码分析——批处理模式JobGraph的创建相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink源码分析——批处理模式JobGraph的创建 仅供自己学习。
Flink不管是流处理还是批处理都是将我们的程序编译成JobGraph进行提交的,之前我们分析过流处理模式下的JobGraph创建,现在我们来分析一下批处理模式下的JobGraph创建。
本文以本地模式为例,分析JobGraph的创建
我们仍然以WordCount为例子来分析JobGraph的创建过程,WordCount代码
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// env.getConfig.setExecutionMode(ExecutionMode.BATCH)
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?",
"li wen tao li wen tao li wen tao"
)
text.flatMap _.toLowerCase.split("\\\\W+").filter _.nonEmpty
.map (_, 1)
.groupBy(0)
.sum(1)
.writeAsText("D:\\\\IDEASPARK\\\\flink\\\\wordcount", WriteMode.OVERWRITE)
env.execute()
这个WordCount执行之后生成的DataSet关系图如下所示:
DataSource ——> FlatMapOperator ——> MapOperator ——> ScalaAggregateOperator ——> DataSink
注意这里的Operator并非是指算子层面的operator,而是在数据集层面的operator,这些operator也还是DataSet的子类型(DataSink除外)
首先看一下执行入口,在本地模式下,会执行LocalEnvironment.execute()方法,先创建执行计划Plan,再开始执行这个计划
//LocalEnvironment
public JobExecutionResult execute(String jobName) throws Exception
if (executor == null)
startNewSession();
Plan p = createProgramPlan(jobName);
// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
this.lastJobExecutionResult = result;
return result;
这个执行计划Plan很简单,里面只包含了一些sinks,先创建执行计划的过程就是将WordCount代码中创建的每个DataSet转换成对应算子层面的operator。
2.创建执行计划Plan
首先我们来看看createProgramPlan()源码实现
//ExecutionEnvironment
public Plan createProgramPlan(String jobName, boolean clearSinks)
...
//创建一个translator转换器,从sink开始转换
OperatorTranslation translator = new OperatorTranslation();
Plan plan = translator.translateToPlan(this.sinks, jobName);
...
return plan;
//OperatorTranslation
public Plan translateToPlan(List<DataSink<?>> sinks, String jobName)
List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
//从sink开始进行向上的深度优先遍历
for (DataSink<?> sink : sinks)
planSinks.add(translate(sink));
Plan p = new Plan(planSinks);
p.setJobName(jobName);
return p;
private <T> GenericDataSinkBase<T> translate(DataSink<T> sink)
// translate the input recursively
//从sink开始递归的向上去进行转换
Operator<T> input = translate(sink.getDataSet());
// translate the sink itself and connect it to the input
GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());
return translatedSink;
private <T> Operator<T> translate(DataSet<T> dataSet)
while (dataSet instanceof NoOpOperator)
dataSet = ((NoOpOperator<T>) dataSet).getInput();
// check if we have already translated that data set (operation or source)
Operator<?> previous = this.translated.get(dataSet);
if (previous != null)
... //已经转换过了
Operator<T> dataFlowOp;
if (dataSet instanceof DataSource)
DataSource<T> dataSource = (DataSource<T>) dataSet;
dataFlowOp = dataSource.translateToDataFlow();
dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
else if (dataSet instanceof SingleInputOperator)
SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
dataFlowOp = translateSingleInputOperator(singleInputOperator);
dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
else if (dataSet instanceof TwoInputOperator)
TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
dataFlowOp = translateTwoInputOperator(twoInputOperator);
dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
else if
...
this.translated.put(dataSet, dataFlowOp);
// take care of broadcast variables
translateBcVariables(dataSet, dataFlowOp);
return dataFlowOp;
private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op)
@SuppressWarnings("unchecked")
SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
@SuppressWarnings("unchecked")
DataSet<I> typedInput = (DataSet<I>) op.getInput();
//在遇到SingleInputOperator节点是继续向上递归,那么整个的递归过程就是从sink后续遍历,先转换source,再依次向下进行转换
Operator<I> input = translate(typedInput);
org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
...
return dataFlowOp;
大致实现就是从sink开始进行向上递归的转换,整个的递归过程就是从sink进行深度优化遍历,先转换source,再依次向下进行转换,转换的方法就是调用每个DataSet(或者DataSink)的translateToDataFlow()方法,将DataSet转换成算子层面的Operator,然后将上一级转换后的Operator当做输入input。
下面看一下每种DataSet(或DataSink)的translateToDataFlow()方法
//
protected GenericDataSourceBase<OUT, ?> translateToDataFlow()
String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
if (name.length() > 150)
name = name.substring(0, 150);
@SuppressWarnings("unchecked", "rawtypes")
GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
new OperatorInformation<OUT>(getType()), name);
source.setParallelism(parallelism);
if (this.parameters != null)
source.getParameters().addAll(this.parameters);
if (this.splitDataProperties != null)
source.setSplitDataProperties(this.splitDataProperties);
return source;
//MapOperator
protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input)
String name = getName() != null ? getName() : "Map at " + defaultName;
// create operator
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
// set input
po.setInput(input);
// set parallelism
if (this.getParallelism() > 0)
// use specified parallelism
po.setParallelism(this.getParallelism());
else
// if no parallelism has been specified, use parallelism of input operator to enable chaining
po.setParallelism(input.getParallelism());
return po;
//ScalaAggregateOperator
protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input)
// sanity check
if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size())
throw new IllegalStateException();
// construct the aggregation function
AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
int[] fields = new int[this.fields.size()];
StringBuilder genName = new StringBuilder();
for (int i = 0; i < fields.length; i++)
aggFunctions[i] = (AggregationFunction<Object>) this.aggregationFunctions.get(i);
fields[i] = this.fields.get(i);
genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
genName.setLength(genName.length() - 1);
@SuppressWarnings("rawtypes")
RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType(), aggFunctions, fields);
String name = getName() != null ? getName() : genName.toString();
// distinguish between grouped reduce and non-grouped reduce
//这种是针对未分组的reduce
if (this.grouping == null)
// non grouped aggregation
UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
po.setCombinable(true);
// set input
po.setInput(input);
// set parallelism
po.setParallelism(this.getParallelism());
return po;
//这种是针对的是分组的reduce,我们的WordCount代码走这里
if (this.grouping.getKeys() instanceof Keys.ExpressionKeys)
// grouped aggregation
int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions();
UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
//默认就开启combiner了,数据预先进行聚合,减少数据传输
po.setCombinable(true);
// set input
po.setInput(input);
// set parallelism
po.setParallelism(this.getParallelism());
SingleInputSemanticProperties props = new SingleInputSemanticProperties();
for (int keyField : logicalKeyPositions)
boolean keyFieldUsedInAgg = false;
for (int aggField : fields)
if (keyField == aggField)
keyFieldUsedInAgg = true;
break;
if (!keyFieldUsedInAgg)
props.addForwardedField(keyField, keyField);
po.setSemanticProperties(props);
po.setCustomPartitioner(grouping.getCustomPartitioner());
return po;
else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys)
throw new UnsupportedOperationException("Aggregate does not support grouping with KeySelector functions, yet.");
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类