flinkFlink 1.12.2 源码浅析 : Task 浅析

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : Task 浅析相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : Task 浅析

Task 表示TaskManager上并行 subtask 的一次执行。

Task封装了一个Flink operator(也可能是一个用户function )并运行它,
提供使用输入数据、生成结果(中间结果分区)和与JobManager通信所需的所有服务。

Flink操作符(实现为{@link AbstractInvokable}的子类)只有数据读取器、编写器和某些事件回调。
该任务将它们连接到网络堆栈和actor消息,并跟踪执行状态和处理异常。、

Task不知道它们与其他任务的关系,也不知道它们是第一次尝试执行任务还是重复执行任务。
所有这些只有JobManager知道。

T只知道自己的可运行代码、任务的配置以及要使用和生成的 intermediate results 的id(如果有的话)。
每个任务由一个专用线程运行。

二 .代码浅析

Task是一个类, 实现了Runnable 接口…

2.1. 属性

/**
 * 线程组 包含所有 task thread ..
 * The thread group that contains all task threads.
 * */
private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");

/**
 * 任务状态标识
 * For atomic state updates.
 * */
private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(  Task.class, ExecutionState.class, "executionState");

// ------------------------------------------------------------------------
// 属性字段 : Task构造方法初始化中的一部分属性.
//  Constant fields that are part of the initial Task construction
// ------------------------------------------------------------------------

/**
 *  JobID
 * The job that the task belongs to. */
private final JobID jobId;

/**
 * 当前任务在JobGraph所属的  job vertex id
 * The vertex in the JobGraph whose code the task executes. */
private final JobVertexID vertexId;

/**
 * 在ExecutionGraph中 子任务所属的 ExecutionAttemptID
 * The execution attempt of the parallel subtask.
 * */
private final ExecutionAttemptID executionId;

/**
 * 在同一个slot中运行锁分配的AllocationID
 * ID which identifies the slot in which the task is supposed to run.
 * */
private final AllocationID allocationId;

/**
 * 任务的相关信息.
 * TaskInfo object for this task. */
private final TaskInfo taskInfo;

/**
 * 任务名称,包含子任务的索引.
 *  The name of the task, including subtask indexes.
 *  */
private final String taskNameWithSubtask;

/**
 * job的配置相关信息
 * The job-wide configuration object.
 * */
private final Configuration jobConfiguration;

/**
 * task指定的配置信息.
 * The task-specific configuration.
 * */
private final Configuration taskConfiguration;

/**
 * task任务需所需要的jar文件
 * The jar files used by this task.
 * */
private final Collection<PermanentBlobKey> requiredJarFiles;

/**
 * classpaths相关
 * The classpaths used by this task.
 * */
private final Collection<URL> requiredClasspaths;

/**
 * 该任务的实例化类型
 * 1. org.apache.flink.streaming.runtime.tasks.SourceStreamTask
 * 2. org.apache.flink.streaming.runtime.tasks.OneInputStreamTask
 *
 * The name of the class that holds the invokable code.
 * */
private final String nameOfInvokableClass;

/**
 * 访问 task manager 配置/host name 相关信息
 * Access to task manager configuration and host names.
 * */
private final TaskManagerRuntimeInfo taskManagerConfig;

/**
 * 内存管理相关.
 * The memory manager to be used by this task.
 * */
private final MemoryManager memoryManager;

/**
 * I/O相关
 *
 * The I/O manager to be used by this task. */
private final IOManager ioManager;

/**
 * 广播变量 BroadcastVariableManager
 * The BroadcastVariableManager to be used by this task. */
private final BroadcastVariableManager broadcastVariableManager;

/**
 * 任务事件的Dispatcher
 */
private final TaskEventDispatcher taskEventDispatcher;

/**
 * 外部resources 信息
 * Information provider for external resources.
 * */
private final ExternalResourceInfoProvider externalResourceInfoProvider;

/**
 * task/slot 的状态信息
 * The manager for state of operators running in this task/slot.
 * */
private final TaskStateManager taskStateManager;

/**
 *
 * job指定execution配置的序列化相关
 * Serialized version of the job specific execution configuration
 * (see {@link ExecutionConfig}).
 */
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;

/**
 * A record-oriented runtime result writer API for producing results.
 *
 * ResultPartitionWriter
 */
private final ResultPartitionWriter[] consumableNotifyingPartitionWriters;

/**
 * An {@link InputGate} with a specific index.
 */
private final IndexedInputGate[] inputGates;

/**
 * task manager的 Connection
 * Connection to the task manager.
 * */
private final TaskManagerActions taskManagerActions;

/**
 *
 * Input split provider for the task.
 * */
private final InputSplitProvider inputSplitProvider;

/**
 * Checkpoint 相关...
 * Checkpoint notifier used to communicate with the CheckpointCoordinator.
 * */
private final CheckpointResponder checkpointResponder;

/**
 * 发送信息给 Job Manager 的Gateway
 * The gateway for operators to send messages to the operator coordinators on the Job Manager.
 */
private final TaskOperatorEventGateway operatorCoordinatorEventGateway;

/**
 *
 * GlobalAggregateManager用于JobMaster的更新和聚合
 * GlobalAggregateManager used to update aggregates on the JobMaster.
 * */
private final GlobalAggregateManager aggregateManager;

/**
 * task请求class loader的时候,加载的library 缓存
 * The library cache, from which the task can request its class loader. */
private final LibraryCacheManager.ClassLoaderHandle classLoaderHandle;

/**
 * 用户定义的文件的缓存.
 * The cache for user-defined files that the invokable requires. */
private final FileCache fileCache;

/**
 * task的 kv 状态服务相关.
 * The service for kvState registration of this task. */
private final KvStateService kvStateService;

/**
 * task启用live reporting of accumulators的注册相关...
 * The registry of this task which enables live reporting of accumulators. */
private final AccumulatorRegistry accumulatorRegistry;

/**
 * 当前执行task的Thread 线程.
 * The thread that executes the task. */
private final Thread executingThread;

/**
 * task metrics相关
 * Parent group for all metrics of this task. */
private final TaskMetricGroup metrics;


/**
 * 分区相关...
 * Partition producer state checker to request partition states from.
 * */
private final PartitionProducerStateChecker partitionProducerStateChecker;

/**
 * Executor ????
 * Executor to run future callbacks. */
private final Executor executor;

/**
 *
 * 当执行一次run方法的Future 索引...
 * Future that is completed once {@link #run()} exits.
 *
 * */
private final CompletableFuture<ExecutionState> terminationFuture = new CompletableFuture<>();

// ------------------------------------------------------------------------
// 属性控制task的执行. 所有的字段是volatile .
//  Fields that control the task execution. All these fields are volatile
//  (which means that they introduce memory barriers), to establish
//  proper happens-before semantics on parallel modification
// ------------------------------------------------------------------------

/**
 * 是否取消 : 默认false
 * atomic flag that makes sure the invokable is canceled exactly once upon error. */
private final AtomicBoolean invokableHasBeenCanceled;

/**
 * task 的 invokable
 * 所有的请求必须复制其引用,并检查是否为null
 * 作为逻辑处理中的一部分,该字段将会被清理...
 * ???????
 *
 * The invokable of this task, if initialized.
 *
 * All accesses must copy the reference and check for null,
 * as this field is cleared as part of the disposal logic.
 *
 */
@Nullable private volatile AbstractInvokable invokable;

/**
 * 任务的状态
 * The current execution state of the task. */
private volatile ExecutionState executionState = ExecutionState.CREATED;

/** The observed exception, in case the task execution failed. */
private volatile Throwable failureCause;

/**
 * 默认值 : 30000 ???
 * Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;


/**
 * 根据Flink 配置进行初始化, 也可在ExecutionConfig中设置.
 * Initialized from the Flink configuration.
 * May also be set at the ExecutionConfig */
private long taskCancellationTimeout;

/**
 * 用户代码类加载器
 * This class loader should be set as the context class loader for threads that may dynamically
 * load user code.
 */
private UserCodeClassLoader userCodeClassLoader;

2.2. 构造方法

   /**
     * <b>IMPORTANT:</b> This constructor may not start any work that would need to be undone in the
     * case of a failing task deployment.
     */
    public Task(
            JobInformation jobInformation,
            TaskInformation taskInformation,
            ExecutionAttemptID executionAttemptID,
            AllocationID slotAllocationId,
            int subtaskIndex,
            int attemptNumber,
            List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
            List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
            int targetSlotNumber,
            MemoryManager memManager,
            IOManager ioManager,
            ShuffleEnvironment<?, ?> shuffleEnvironment,
            KvStateService kvStateService,
            BroadcastVariableManager bcVarManager,
            TaskEventDispatcher taskEventDispatcher,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            TaskStateManager taskStateManager,
            TaskManagerActions taskManagerActions,
            InputSplitProvider inputSplitProvider,
            CheckpointResponder checkpointResponder,
            TaskOperatorEventGateway operatorCoordinatorEventGateway,
            GlobalAggregateManager aggregateManager,
            LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
            FileCache fileCache,
            TaskManagerRuntimeInfo taskManagerConfig,
            @Nonnull TaskMetricGroup metricGroup,
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
            PartitionProducerStateChecker partitionProducerStateChecker,
            Executor executor) {

        Preconditions.checkNotNull(jobInformation);
        Preconditions.checkNotNull(taskInformation);

        Preconditions.checkArgument(0 <= subtaskIndex, "The subtask index must be positive.");
        Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
        Preconditions.checkArgument(
                0 <= targetSlotNumber, "The target slot number must be positive.");

        this.taskInfo =
                new TaskInfo(
                        taskInformation.getTaskName(),
                        taskInformation.getMaxNumberOfSubtasks(),
                        subtaskIndex,
                        taskInformation.getNumberOfSubtasks(),
                        attemptNumber,
                        String.valueOf(slotAllocationId));

        this.jobId = jobInformation.getJobId();
        this.vertexId = taskInformation.getJobVertexId();
        this.executionId = Preconditions.checkNotNull(executionAttemptID);
        this.allocationId = Preconditions.checkNotNull(slotAllocationId);
        this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = jobInformation.getJobConfiguration();
        this.taskConfiguration = taskInformation.getTaskConfiguration();
        this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
        this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
        this.nameOfInvokableClass = taskInformation.getInvokableClassName();
        this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();

        Configuration tmConfig = taskManagerConfig.getConfiguration();
        this.taskCancellationInterval =
                tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
        this.taskCancellationTimeout =
                tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);

        this.memoryManager = Preconditions.checkNotNull(memManager);
        this.ioManager = Preconditions.checkNotNull(ioManager);
        this.broadcastVariableManager = Preconditions.checkNotNull(bcVarManager);
        this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
        this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
        this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);

        this.inputSplitProvider = Preconditions.checkNotNull(inputSplitProvider);
        this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
        this.operatorCoordinatorEventGateway =
                Preconditions.checkNotNull(operatorCoordinatorEventGateway);
        this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
        this.taskManagerActions = checkNotNull(taskManagerActions);
        this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);

        this.classLoaderHandle = Preconditions.checkNotNull(classLoaderHandle);
        this.fileCache = Preconditions.checkNotNull(fileCache);
        this.kvStateService = Preconditions.checkNotNull(kvStateService);
        this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig);

        this.metrics = metricGroup;

        this.partitionProducerStateChecker =
                Preconditions.checkNotNull(partitionProducerStateChecker);
        this.executor = Preconditions.checkNotNull(executor);

        // create the reader and writer structures

        final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';

        final ShuffleIOOwnerContext taskShuffleContext =
                shuffleEnvironment.createShuffleIOOwnerContext(
                        taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());

        // produced intermediate result partitions
        final ResultPartitionWriter[] resultPartitionWriters =
                shuffleEnvironment
                        .createResultPartitionWriters(
                                taskShuffleContext, resultPartitionDeploymentDescriptors)
                        .toArray(new ResultPartitionWriter[] {});

        this.consumableNotifyingPartitionWriters =
                ConsumableNotifyingResultPartitionWriterDecorator.decorate(
                        resultPartitionDeploymentDescriptors,
                        resultPartitionWriters,
                        this,
                        jobId,
                        resultPartitionConsumableNotifier);

        // consumed intermediate result partitions
        final IndexedInputGate[] gates =
                shuffleEnvironment
                        .createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
                        .toArray(new IndexedInputGate[0]);

        this.inputGates = new IndexedInputGate[gates.length];
        int counter = 0;
        for (IndexedInputGate gate : gates) {
            inputGates[counter++] =
                    new InputGateWithMetrics(
                            gate, metrics.getIOMetricGroup().getNumBytesInCounter());
        }

        if (shuffleEnvironment instanceof NettyShuffleEnvironment) {
            //noinspection deprecation
            ((NettyShuffleEnvironment) shuffleEnvironment)
                    .registerLegacyNetworkMetrics(
                            metrics.getIOMetricGroup(), resultPartitionWriters, gates);
        }

        invokableHasBeenCanceled = new AtomicBoolean(false);

        // finally, create the executing thread, but do not start it
        executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
    }

2.3. startTaskThread

启动task 线程.

/** Starts the task's thread. */
public void startTaskThread() {
    executingThread.start();
}

2.4. doRun

因为Task实现了Runnable接口. 所以启动必须是由Thread类的start启动. 然后调用run()方法.
在执行run方法的时候, 会引导任务并执行其代码的核心工作方法doRun。

/**
 * 引导任务并执行其代码的核心工作方法。
 * The core work method that bootstraps the task and executes its code.
 * */
@Override
public void run() {
    try {
        doRun();
    } finally {
        terminationFuture.complete(executionState);
    }
}

doRun

private void doRun() {
        // ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
            // 获取当前状态
            ExecutionState current = this.executionState;
            if (current == ExecutionState.CREATED) {
                // 创建&执行
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    // 跳出,开始执行任务...
                    break;
                }


            } else if (current == ExecutionState.FAILED) {
                // 失败
                // we were immediately failed. tell the TaskManager that we reached our final state
                notifyFinalState();
                if (metrics != null) {
                    metrics.close();
                }
                return;
            } else if (current == ExecutionState.CANCELING) {
                // 取消
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    // we were immediately canceled. tell the TaskManager that we reached our final
                    // state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
            } else {
                if (metrics != null) {
                    metrics.close();
                }
                throw new IllegalStateException(
                        "Invalid state for beginning of operation of task " + this + '.');
            }
        }

        // 所有从这里获取和注册的资源最终都需要撤消
        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;

        try {
            // ----------------------------
            //  任务引导-我们定期检查是否作为快捷方式取消
            //  Task Bootstrap - We periodically
            //  check for canceling as a shortcut
            // ----------------------------

            // activate safety net for task thread
            // Creating FileSystem stream leak safety net for task
            //
            //      Window(TumblingProcessingTimeWindows(5000),  ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)
            //      ->
            //      Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b) [DEPLOYING]
            //
            LOG.debug("Creating FileSystem stream leak safety net for task {}", this);
            FileSystemSafetyNet.initializeSafetyNetForThread();


            // 首先,获取一个用户代码类加载器这可能涉及下载作业的JAR文件和/或类
            // first of all, get a user-code classloader
            // this may involve downloading the job's JAR files and/or classes

            // 加载Task 所需的JAR
            // Loading JAR files for task
            //      Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b) [DEPLOYING].
            LOG.info("Loading JAR files for task {}.", this);


            // 获取用户类加载器 : UserCodeClassLoader
            // Getting user code class loader for task 141dd597dc560a831b2b4bc195943f0b at library cache manager took 10 milliseconds
            userCodeClassLoader = createUserCodeClassloader();

            final ExecutionConfig executionConfig =
                    serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());

         

以上是关于flinkFlink 1.12.2 源码浅析 : Task 浅析的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint