Apache Flink fault tolerance源码剖析

Posted vinoYang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink fault tolerance源码剖析相关的知识,希望对你有一定的参考价值。

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题。上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理。当然原理归原理,原理体现在代码实现里并不是想象中的那么直观。这里的源码剖析也是我学习以及理解的过程。

作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来保存/恢复状态?这就是本篇要谈的主要内容。

跟Flink官方文档的说明一样,在文章中会混杂着检查点快照这两个术语,不要太过于纠结它们,某种程度它们是一致的。

从这篇文章开始,当我谈及一个类,我会给出它的全限定名,以方便大家对照。

在Flink中,需要具备Fault Tolerance能力的通常是两类对象:function以及operator

其中function通常通过实现Checkpointed来达到这个目的,而operator通过实现StreamOpeator(该接口中包含了快照、恢复状态的接口方法)。

我们会分别来分析这两个接口,然后列举一些典型的需要具备Fault Tolerance功能的对象,并分析它们的实现。

Checkpointed

org.apache.flink.streaming.api.checkpoint.Checkpointed

该接口提供给那些需要持久化状态的functionoperator使用。该接口是同步模式的快照机制。

两个接口方法:

  • snapshotState:获得function或者operator的当前状态(快照),这个状态必须反映该function之前的变更所产生的最终结果

该方法接收两个参数,第一个参数是checkpointId,表示该检查点的ID,第二个参数checkpointTimestamp,检查点的时间戳,被JobManagerSystem.currentTimeMillis()驱动

  • restoreState:用于从之前的检查点中恢复functionoperator的状态,需要注意的是该方法的调用会早于open方法

CheckpointedAsynchronously

org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously

该接口继承自Checkpointed,属于标记型接口,用于跟Checkpointed同步快照机制进行区分。

MessageAcknowledgingSourceBase

org.apache.flink.streaming.api.functions.source

该类是Flink内置的众多SouceFunction之一,也是基于具备ack(确认机制)的Message Queue的实现模板之一。

该类的完整签名:

public abstract class MessageAcknowledgingSourceBase<Type, UId>
    extends RichSourceFunction<Type>
    implements Checkpointed<SerializedCheckpointData[]>, CheckpointListener

首先我们从该类的实现接口中可以看到它希望保存的状态是:SerializedCheckpointData[]:

SerializedCheckpointData表示作为快照数据的被序列化元素的集合

有两个非常重要的属性:

/** The list gathering the IDs of messages emitted during the current checkpoint */
private transient List<UId> idsForCurrentCheckpoint;

/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
private transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;


/**
 * Set which contain all processed ids. Ids are acknowledged after checkpoints. When restoring
 * a checkpoint, ids may be processed again. This happens when the checkpoint completed but the
 * ids for a checkpoint haven‘t been acknowledged yet.
 */
private transient Set<UId> idsProcessedButNotAcknowledged;
  • idsForCurrentCheckpoint:该集合存储着当前检查点覆盖范围内,消费掉的消息的ID集合
  • pendingCheckpoints:该集合收集的是:检查点以及该检查点中那些已经被触发处理的但没有完成的(或没有收到完成通知的)消息的ID集合对(Tuple2)
  • idsProcessedButNotAcknowledged:它用于存储那些已经被处理过的消息的ID,这些消息的ack在检查点完成之后。也就是说,如果从该检查点开始恢复,那么这些id的消息可能会被重放。

看到上面的定义,虽然都是用来存储跟消息ID相关的“集合”,但却是三种不同的数据结构,而且前两个是有序的,最后一个是无序的

后面的文章我们会谈到检查点分:PendingCheckpoint(未完成的)和CompletedCheckpoint(已完成的)

来看Checkpointed的两个接口方法的实现:

    public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
                    idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);

        pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));

        idsForCurrentCheckpoint = new ArrayList<>(64);

        return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
    }

首先是构建快照的方法,它将当前检查点checkpointId及其相关的消费走的消息集合idsForCurrentCheckpoint构建成一个元组Tuple2加入待处理的检查点中pendingCheckpoints。接着重新初始化了idsForCurrentCheckpoint(因为当前这个检查点的快照已经生成了,所以跟当前检查点相关的元素也需要清空掉)。

    public void restoreState(SerializedCheckpointData[] state) throws Exception {
        pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
        // build a set which contains all processed ids. It may be used to check if we have
        // already processed an incoming message.
        for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
            idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
        }
    }

接下来是恢复状态的逻辑,它的过程也很简单。首先反序列化状态对象到pendingCheckpoints,然后遍历整个元组集合,针对每个没有完成的检查点元组checkpoint,提取出每个这些checkpoint对应的消息ID集合,将他们全部加入idsProcessedButNotAcknowledged集合中去。

从构建快照的snapshotState方法中,我们看到针对每个checkpointId都将其涵盖范围内的所有的ID集合拼装成一个二元组加入到pendingCheckpoints。而随着消息被消费,如果到最后该checkpointId对应的所有消息ID都被完全处理也就是说该检查点变成了CompletedCheckpoint,那么如何将该二元组从pendingCheckpoints移除?Flink提供了一个CheckpointListener它会在某个检查点完成之后给出通知,客户程序可以订阅它然后进行相应的回调处理notifyCheckpointComplete,该类的回调实现如下:

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);

        for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
            Tuple2<Long, List<UId>> checkpoint = iter.next();
            long id = checkpoint.f0;

            if (id <= checkpointId) {
                LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
                acknowledgeIDs(checkpointId, checkpoint.f1);
                // remove deduplication data
                idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
                // remove checkpoint data
                iter.remove();
            }
            else {
                break;
            }
        }
    }

获取到已完成的checkpointId,然后遍历整个pendingCheckpoints集合,找到所有checkpointId小于当前已完成的checkpointId,然后完成三个动作:

  • ack 该checkpointId对应的所有这些消息的ID
  • 将这些消息的ID从idsProcessedButNotAcknowledged中移除
  • 将该二元组从pendingCheckpoints中移除

为什么这里判断条件是<=呢,因为checkpointId是时序递增的,而且Flink保证如果某个检查点完成,那么比该检查点小的检查点肯定也完成了。因为,检查点越小与其有关的消息集合越早被处理。

另外一个需要注意的是,该方法中的acknowledgeIDs是抽象方法,待具体类根据自己的ack机制实现。

RabbitMQ 对接Flink的Source —— RMQSource就是通过继承MessageAcknowledgingSourceBase 实现的

FlinkKafkaConsumerBase

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制,只不过这里实现的是异步模式的检查点机制CheckpointedAsynchronously

该类的完整签名如下:

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
        implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>

从签名来看,它快照的数据类型是:

HashMap<KafkaTopicPartition, Long>

该类型描述了kafka消息消费的具体的信息(包含topic,partition,offset)。

然后继续看snapshotState方法:

// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
        //noinspection unchecked
        HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone();

        // the map cannot be asynchronously updated, because only one checkpoint call can happen
        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
        pendingCheckpoints.put(checkpointId, currentOffsets);

        while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
            pendingCheckpoints.remove(0);
        }

这是其核心代码。它先将offsetsState克隆一份,作为当前检查点的快照,然后放入pendingCheckpoints作为待处理的检查点集合。

这里有一个安全性检查:如果待处理的安全点集合大于默认设定的阈值(100),则移除集合中第一个检查点,这么做的目的是为了防止集合太大导致内存泄漏

restoreState方法的实现,只是将待恢复的偏移量快照对象赋予当前对象的偏移量而已。

MessageAcknowledgingSourceBase,为了得到检查点完成的通知,FlinkKafkaConsumerBase也实现了CheckpointListener接口,以在检查点完成时进行回调处理。来看看notifyCheckpointComplete方法的实现:

            HashMap<KafkaTopicPartition, Long> checkpointOffsets;

            // the map may be asynchronously updates when snapshotting state, so we synchronize
            synchronized (pendingCheckpoints) {
                final int posInMap = pendingCheckpoints.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                    return;
                }

                //noinspection unchecked
                checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);


                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingCheckpoints.remove(0);
                }
            }
            if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
                LOG.debug("Checkpoint state was empty.");
                return;
            }
            commitOffsets(checkpointOffsets);

可以看到核心代码都进行了同步保护,因为pendingCheckpoints很可能会被异步更新。它先根据完成了的检查点,获得其在pendingCheckpoints中的索引。如果判断索引不存在,则直接退出。否则,移除该索引对应的快照信息,然后将小于当前索引(较旧的)的快照信息也一并移除(这一点我之前解释过,因为所有的检查点都是按时间递增有序的)。最后将当前完成的检查点对应的消息的偏移量进行commit,也即commitOffsets。只不过这里该方法被定义为抽象方法,因为Kafka不同版本的API差别的原因,由适配不同版本的consumer各自实现。

Flink以Kafka作为Source的具体实现机制,不是本文的重点,后续可以另开文章进行讲解

StatefulSequenceSource

org.apache.flink.streaming.api.functions.source

这是一个有状态的、给定起始和截止元素的并行序列发射器,由于它需要提供exactly once保证,所以它实现了Checkpointed接口。
它主要是用来维护一个称之为collected的发射进度状态,对其进行快照以便于实现fault tolerance

StreamOperator

org.apache.flink.streaming.api.operators. StreamOperator

StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • snapshotOperatorState
  • restoreState
  • notifyOfCompletedCheckpoint

这三个方法来自于我们之前谈到的Checkpointed以及CheckpointListener。这也由此可见,在operator中快照机制由可选项变成了必选项。

这是不难理解的,因为operator处于运行时,诸如分区信息都是必须要做快照的。

这里需要注意的是snapshotOperatorState方法,它返回值为StreamTaskState。它是表示task所有状态的一个容器对象,它包含了三类状态:

  • operatorState
  • functionState
  • kvStates

这不是本文的重点,后面的文章再谈

AbstractStreamOperator

org.apache.flink.streaming.api.operators.AbstractStreamOperator

AbstractStreamOperatorStreamOperator的抽闲类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

来看snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        // here, we deal with key/value state snapshots

        StreamTaskState state = new StreamTaskState();

        if (stateBackend != null) {
            HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
                stateBackend.snapshotPartitionedState(checkpointId, timestamp);
            if (partitionedSnapshots != null) {
                state.setKvStates(partitionedSnapshots);
            }
        }


        return state;
    }

可以看到它依赖于一个叫stateBackend的东西,在之前一篇文章中我们有谈及过,它是state最终的持久化机制的实现。并且从注释可以看到这里只提供了针对key/value状态的快照模板。

AbstractUdfStreamOperator

org.apache.flink.streaming.api.operators

该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)的operator提供模板。

snapshotOperatorState方法:

    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp);

        if (userFunction instanceof Checkpointed) {
            @SuppressWarnings("unchecked")
            Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

            Serializable udfState;
            try {
                udfState = chkFunction.snapshotState(checkpointId, timestamp);
            } 
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }

            if (udfState != null) {
                try {
                    AbstractStateBackend stateBackend = getStateBackend();
                    StateHandle<Serializable> handle = 
                            stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
                    state.setFunctionState(handle);
                }
                catch (Exception e) {
                    throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                            + e.getMessage(), e);
                }
            }
        }

        return state;
    }

这里我们终于再次看到了Checkpointed接口。这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。

函数状态的持久化代码:

AbstractStateBackend stateBackend = getStateBackend();
StateHandle<Serializable> handle = 
    stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
state.setFunctionState(handle);

小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。


微信扫码关注公众号:Apache_Flink

技术分享


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

技术分享


以上是关于Apache Flink fault tolerance源码剖析的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink fault tolerance源码剖析

Apache Flink fault tolerance源码剖析

Apache Flink fault tolerance源码剖析完结篇

Apache Flink数据流的Fault Tolerance机制

Flink学习入门教程之Fault Tolerance via State Snapshots

关于使用Axis2 webservice 处理Fault响应时抛org.apache.axis2.AxisFault的分析