flink作业提交源码解析 - StreamGraph的生成

Posted 洽洽老大

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink作业提交源码解析 - StreamGraph的生成相关的知识,希望对你有一定的参考价值。

flink作业提交源码解析(2) - StreamGraph的生成

原文链接
版本

flink: release-1.14
os: ubuntu 16.04
IDE: IDEA

WordCount源码及执行流程概览

上文flink作业提交源码解析(1)中说道fink-client中的方法callMainMethod使用了反射机制,去运行用户代码的入口类。本文就进一步研究用户的代码执行的逻辑。

使用自带的WordCount.jar作为例子。运行命令如下:

bin/flink run -t remote -d ./examples/streaming/WordCount.jar

wordCount的代码如下:

 public static void main(String[] args) throws Exception 

        // Checking input parameters
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text = null;
        if (params.has("input")) 
            // union all the inputs from text files
            for (String input : params.getMultiParameterRequired("input")) 
                if (text == null) 
                    text = env.readTextFile(input);
                 else 
                    text = text.union(env.readTextFile(input));
                
            
            Preconditions.checkNotNull(text, "Input DataStream should not be null.");
         else 
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        

        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(value -> value.f0)
                        .sum(1);

        // emit result
        if (params.has("output")) 
            counts.writeAsText(params.get("output"));
         else 
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        
        // execute program
        env.execute("Streaming WordCount");
    

上面代码中,当执行env.execute("Streaming WordCount"),会调用具体的ExecutionEnvironment去提交作业,这里是StreamExecutionEnvironment,如下面代码所示,逻辑便是生成streamGraph,然后执行。

//StreamExecutionEnvironment 
public JobExecutionResult execute(String jobName) throws Exception 
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        final StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        return execute(streamGraph);
    

获取和生成StreamExecutionEnvironment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

获取一个执行环境ExecutionEnvironment

//StreamExecutionEnvironment
/**
 * Creates an execution environment that represents the context in which the program is
 * currently executed. If the program is invoked standalone, this method returns a local
 * execution environment, as returned by @link #createLocalEnvironment().
 *
 * @return The execution environment of the context in which the program is executed.
 */
public static StreamExecutionEnvironment getExecutionEnvironment() 
    return getExecutionEnvironment(new Configuration());

通过一个StreamExecutionEnvironmentFactory去获取,这里的contextEnvironmentFactoryclient在执行用户代码前初始化好的。

//StreamExecutionEnvironment

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) 
    return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
            .map(factory -> factory.createExecutionEnvironment(configuration))
            .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ps4y2G1D-1644398935142)(…/…/images/flink/stream-graph-jar/stream-1.png)]

当wordCount代码执行到env.execute("Streaming WordCount")时,可以看到text这个DataStream,包含了LegacySourceTransformation

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lC5yTNU3-1644398935148)(…/…/images/flink/stream-graph-jar/stream-2.png)]

counts这个则包含一个ReduceTransformation

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3lgTAnsv-1644398935153)(…/…/images/flink/stream-graph-jar/stream-3.png)]

获取StreamGraph的逻辑在final StreamGraph streamGraph = getStreamGraph();这行代码的getStreamGraph()方法中,一步步跟进去

//StreamExecutionEnvironment
@Internal
public StreamGraph getStreamGraph() 
	return getStreamGraph(true);

//这里执行完生成StreamGraph之后,会清空StreamExecutionEnvironment的Transformations列表
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) 
	final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
	if (clearTransformations) 
		transformations.clear();
	
	return streamGraph;


private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) 
    if (transformations.size() <= 0) 
        throw new IllegalStateException(
                "No operators defined in streaming topology. Cannot execute.");
    
    return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
            .setStateBackend(defaultStateBackend)
            .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
            .setSavepointDir(defaultSavepointDirectory)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout)
            .setSlotSharingGroupResource(slotSharingGroupResources);

最终会使用StreamGraphGenerator这个生成器来生成StreamGraph, 打印需要转换的transformations,该作业包含了3个Transformation,

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sfMS19rq-1644398935155)(…/…/images/flink/stream-graph-jar/stream-4.png)]

StreamGraph生成流程总览

  • 首先,在env中生成一颗Transformations树,存储在List<Transformation<?>> transformatinos中,如下图所示,包含了3个Transformation
  • 其次,遍历transformatinos
    • OneInputTransformation
      1. 获取到inputLegacySourceTransformation,生成了Source:Collection Source这个StreamNode
      2. 处理OneInputTransformation,生成Flat Map这个StreamNode
      3. 添加StreamEdgeSource: Collection Source-1_Flat Map-2_0_FORWARD_0)连接上游Source:Collection SourceFlat Map,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
    • RedudeTransformation:
      1. 获取到inputPartitionTransformation,该Transformation不会生成StreamNode,只是生成一个虚拟的分区节点,记录在StreamGraph对象的virtualPartitionNodes属性中
      2. 处理RedudeTransformation,生成Keyed Aggregation这个StreamNode
      3. 添加streamEdge连接上游和自身,发现上游是虚拟分区节点,从virtualPartitionNodes获取到上游的StreamNodeFlat Map,生成StreamEdgeFlat Map-2_Keyed Aggregation-4_0_HASH_0)连接Flat MapKeyed Aggregation,这里指定了分区方式为HASH
    • LegacySinkTransformation:
      1. 获取到inputReduceTransformation,该节点已生成
      2. 处理LegacySinkTransformation,生成Sink: Print to Std. Out这个StreamNode
      3. 添加StreamEdgeKeyed Aggregation-4_Sink: Print to Std. Out-5_0_FORWARD_0)连接上游Keyed AggregationSink: Print to Std. Out,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iAuefQxF-1644398935156)(…/…/images/flink/stream-graph-jar/stream-graph.png)]

最终的streamGraph为:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gc5KeW6F-1644398935158)(…/…/images/flink/stream-graph-jar/stream-graph1.png)]

StreamGraph生成的源码跟踪

主流程

主要包含几个步骤:

  1. 初始化并配置streamGraph的信息
  2. 遍历所有的Transformation,并对transformation进行转换
//StreamGraphGenerator
public StreamGraph generate() 
    	//1. 初始化并配置streamGraph的信息
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(streamGraph);
		//用户保存已经转换的Transformation
        alreadyTransformed = new HashMap<>();
		//2. 对transformation进行转换
        for (Transformation<?> transformation : transformations) 
            transform(transformation);
        
		
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) 
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) 
                for (StreamEdge edge : node.getInEdges()) 
                    edge.setSupportsUnalignedCheckpoints(false);
                
            
        

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    

核心代码在transform(transformation)

//StreamGraphGenerator
private Collection<Integer> transform(Transformation<?> transform) 
	//1. 如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop
    if (alreadyTransformed.containsKey(transform)) 
        return alreadyTransformed.get(transform);
    

    LOG.debug("Transforming " + transform);
    //2. 设置并行度
    if (transform.getMaxParallelism() <= 0) 

        // if the max parallelism hasn't been set, then first use the job wide max parallelism
        // from the ExecutionConfig.
        int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
        if (globalMaxParallelismFromConfig > 0) 
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
        
    
	//3. 设置slot共享组
    transform
            .getSlotSharingGroup()
            .ifPresent(
                    slotSharingGroup -> 
                        final ResourceSpec resourceSpec =
                                SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
                        if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) 
                            slotSharingGroupResources.compute(
                                    slotSharingGroup.getName(),
                                    (name, profile) -> 
                                        if (profile == null) 
                                            return ResourceProfile.fromResourceSpec(
                                                    resourceSpec, MemorySize.ZERO);
                                         else if (!ResourceProfile.fromResourceSpec(
                                                        resourceSpec, MemorySize.ZERO)
                                                .equals(profile)) 
                                            throw new IllegalArgumentException(
                                                    "The slot sharing group "
                                                            + slotSharingGroup.getName()
                                                            + " has been configured with two different resource spec.");
                                         else 
                                            return profile;
                                        
                                    );
                        
                    );
	
    //4. 调用判断是否有推断出outputType,有则抛出异常
    transform.getOutputType();

    //5. 调用具体的translator
    @SuppressWarnings("unchecked")
    final TransformationTranslator<?, Transformation<?>> translator =
            (TransformationTranslator<?, Transformation<?>>)
                    translatorMap.get(transform.getClass());

    Collection<Integer> transformedIds;
    if (translator != null) 
        transformedIds = translate(translator, transform);
     else 
        transformedIds = legacyTransform(transform);
    

    // need this check because the iterate transformation adds itself before
    // transforming the feedback edges
    if (!alreadyTransformed.containsKey(transform)) 
        alreadyTransformed.put(transform, transformedIds);
    

    return transformedIds;

步骤如下:

  1. 如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop
  2. 设置并行度
  3. 设置SlotSharingGroup
  4. 调用判断是否有推断出outputType,有则抛出异常
  5. 调用具体的translator,内置的translator都被保留在translatorMap中,具体如下所示
//StreamGraphGenerator
static 
    @SuppressWarnings("rawtypes")
    Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
            tmp = new HashMap<>();
    tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
    tmp.put(
            TimestampsAndWatermarksTransformation.class,
            new TimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
    tmp.put(
            KeyedBroadcastStateTransformation.class,
            new KeyedBroadcastStateTransformationTranslator<>());
    translatorMap = Collections.unmodifiableMap(tmp);

  1. 将转换过的transformation添加到alreadyTransformed

调用具体的translator翻译的代码逻辑如下所示

//StreamGraphGenerator
private Collection<Integer> translate(
        final TransformationTranslator<?, Transformation<?>> translator,
        final Transformation<?> transform) 
    checkNotNull(translator);
    checkNotNull(transform);
	
    final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

    // the recursive call might have already transformed this
    if (alreadyTransformed.containsKey(transform)) 
        return alreadyTransformed.get(transform);
    

    flink作业提交源码解析-命令行解析及运行

flink作业提交源码解析 - StreamGraph的生成

flink作业提交源码解析 - StreamGraph的生成

Flink源码篇,作业提交流程作业调度流程作业内部转换流程图

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager