flink作业提交源码解析 - StreamGraph的生成
Posted 洽洽老大
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink作业提交源码解析 - StreamGraph的生成相关的知识,希望对你有一定的参考价值。
flink作业提交源码解析(2) - StreamGraph的生成
原文链接
版本
flink: release-1.14
os: ubuntu 16.04
IDE: IDEA
WordCount源码及执行流程概览
上文flink作业提交源码解析(1)中说道fink-client
中的方法callMainMethod
使用了反射机制,去运行用户代码的入口类。本文就进一步研究用户的代码执行的逻辑。
使用自带的WordCount.jar作为例子。运行命令如下:
bin/flink run -t remote -d ./examples/streaming/WordCount.jar
wordCount的代码如下:
public static void main(String[] args) throws Exception
// Checking input parameters
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<String> text = null;
if (params.has("input"))
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input"))
if (text == null)
text = env.readTextFile(input);
else
text = text.union(env.readTextFile(input));
Preconditions.checkNotNull(text, "Input DataStream should not be null.");
else
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);
// emit result
if (params.has("output"))
counts.writeAsText(params.get("output"));
else
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
// execute program
env.execute("Streaming WordCount");
上面代码中,当执行env.execute("Streaming WordCount")
,会调用具体的ExecutionEnvironment
去提交作业,这里是StreamExecutionEnvironment
,如下面代码所示,逻辑便是生成streamGraph,然后执行。
//StreamExecutionEnvironment
public JobExecutionResult execute(String jobName) throws Exception
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
获取和生成StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
获取一个执行环境ExecutionEnvironment
//StreamExecutionEnvironment
/**
* Creates an execution environment that represents the context in which the program is
* currently executed. If the program is invoked standalone, this method returns a local
* execution environment, as returned by @link #createLocalEnvironment().
*
* @return The execution environment of the context in which the program is executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment()
return getExecutionEnvironment(new Configuration());
通过一个StreamExecutionEnvironmentFactory
去获取,这里的contextEnvironmentFactory
是client
在执行用户代码前初始化好的。
//StreamExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration)
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ps4y2G1D-1644398935142)(…/…/images/flink/stream-graph-jar/stream-1.png)]
当wordCount代码执行到env.execute("Streaming WordCount")
时,可以看到text
这个DataStream
,包含了LegacySourceTransformation
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lC5yTNU3-1644398935148)(…/…/images/flink/stream-graph-jar/stream-2.png)]
counts
这个则包含一个ReduceTransformation
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3lgTAnsv-1644398935153)(…/…/images/flink/stream-graph-jar/stream-3.png)]
获取StreamGraph
的逻辑在final StreamGraph streamGraph = getStreamGraph();
这行代码的getStreamGraph()
方法中,一步步跟进去
//StreamExecutionEnvironment
@Internal
public StreamGraph getStreamGraph()
return getStreamGraph(true);
//这里执行完生成StreamGraph之后,会清空StreamExecutionEnvironment的Transformations列表
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations)
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations)
transformations.clear();
return streamGraph;
private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations)
if (transformations.size() <= 0)
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
最终会使用StreamGraphGenerator
这个生成器来生成StreamGraph
, 打印需要转换的transformations
,该作业包含了3个Transformation
,
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sfMS19rq-1644398935155)(…/…/images/flink/stream-graph-jar/stream-4.png)]
StreamGraph生成流程总览
- 首先,在env中生成一颗Transformations树,存储在
List<Transformation<?>> transformatinos
中,如下图所示,包含了3个Transformation
- 其次,遍历
transformatinos
OneInputTransformation
:- 获取到
input
为LegacySourceTransformation
,生成了Source:Collection Source
这个StreamNode
- 处理
OneInputTransformation
,生成Flat Map
这个StreamNode
- 添加
StreamEdge
(Source: Collection Source-1_Flat Map-2_0_FORWARD_0
)连接上游Source:Collection Source
和Flat Map
,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
- 获取到
RedudeTransformation
:- 获取到
input
为PartitionTransformation
,该Transformation
不会生成StreamNode
,只是生成一个虚拟的分区节点,记录在StreamGraph
对象的virtualPartitionNodes
属性中 - 处理
RedudeTransformation
,生成Keyed Aggregation
这个StreamNode
- 添加
streamEdge
连接上游和自身,发现上游是虚拟分区节点,从virtualPartitionNodes
获取到上游的StreamNode
即Flat Map
,生成StreamEdge
(Flat Map-2_Keyed Aggregation-4_0_HASH_0
)连接Flat Map
和Keyed Aggregation
,这里指定了分区方式为HASH
- 获取到
LegacySinkTransformation
:- 获取到
input
为ReduceTransformation
,该节点已生成 - 处理
LegacySinkTransformation
,生成Sink: Print to Std. Out
这个StreamNode
- 添加
StreamEdge
(Keyed Aggregation-4_Sink: Print to Std. Out-5_0_FORWARD_0
)连接上游Keyed Aggregation
和Sink: Print to Std. Out
,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
- 获取到
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iAuefQxF-1644398935156)(…/…/images/flink/stream-graph-jar/stream-graph.png)]
最终的streamGraph为:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gc5KeW6F-1644398935158)(…/…/images/flink/stream-graph-jar/stream-graph1.png)]
StreamGraph生成的源码跟踪
主流程
主要包含几个步骤:
- 初始化并配置
streamGraph
的信息 - 遍历所有的
Transformation
,并对transformation
进行转换
//StreamGraphGenerator
public StreamGraph generate()
//1. 初始化并配置streamGraph的信息
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
//用户保存已经转换的Transformation
alreadyTransformed = new HashMap<>();
//2. 对transformation进行转换
for (Transformation<?> transformation : transformations)
transform(transformation);
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
for (StreamNode node : streamGraph.getStreamNodes())
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing))
for (StreamEdge edge : node.getInEdges())
edge.setSupportsUnalignedCheckpoints(false);
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
核心代码在transform(transformation)
。
//StreamGraphGenerator
private Collection<Integer> transform(Transformation<?> transform)
//1. 如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop
if (alreadyTransformed.containsKey(transform))
return alreadyTransformed.get(transform);
LOG.debug("Transforming " + transform);
//2. 设置并行度
if (transform.getMaxParallelism() <= 0)
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0)
transform.setMaxParallelism(globalMaxParallelismFromConfig);
//3. 设置slot共享组
transform
.getSlotSharingGroup()
.ifPresent(
slotSharingGroup ->
final ResourceSpec resourceSpec =
SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
if (!resourceSpec.equals(ResourceSpec.UNKNOWN))
slotSharingGroupResources.compute(
slotSharingGroup.getName(),
(name, profile) ->
if (profile == null)
return ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO);
else if (!ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO)
.equals(profile))
throw new IllegalArgumentException(
"The slot sharing group "
+ slotSharingGroup.getName()
+ " has been configured with two different resource spec.");
else
return profile;
);
);
//4. 调用判断是否有推断出outputType,有则抛出异常
transform.getOutputType();
//5. 调用具体的translator
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null)
transformedIds = translate(translator, transform);
else
transformedIds = legacyTransform(transform);
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform))
alreadyTransformed.put(transform, transformedIds);
return transformedIds;
步骤如下:
- 如果某个
transformation
已经转换过,直接返回transformedId
,这里要判断,是因为graph
可能会出现loop
- 设置并行度
- 设置
SlotSharingGroup
- 调用判断是否有推断出outputType,有则抛出异常
- 调用具体的
translator
,内置的translator
都被保留在translatorMap
中,具体如下所示
//StreamGraphGenerator
static
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
- 将转换过的
transformation
添加到alreadyTransformed
中
调用具体的translator
翻译的代码逻辑如下所示
//StreamGraphGenerator
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform)
checkNotNull(translator);
checkNotNull(transform);
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform))
return alreadyTransformed.get(transform);
flink作业提交源码解析-命令行解析及运行
flink作业提交源码解析 - StreamGraph的生成
flink作业提交源码解析 - StreamGraph的生成