Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

我们知道exactly-once,大多知道有checkpoint,但是在Flink1.4之后,又新增了端到端的exactly-once。也就是输入和输出是对应的,没有丢失和重复。

1.1、Flink-1.4之前的exactly-once实现

Flink的基本思路就是将状态定时地checkpiont到hdfs中去,当发生failure的时候恢复上一次的状态,然后将输出update到外部。这里需要注意的是输入流的offset也是状态的一部分,因此一旦发生failure就能从最后一次状态恢复,从而保证输出的结果是exactly once。

1.2、Flink-1.4之后的exactly-once实现

2017年12月,apache flink 1.4.0发布。其中有一个里程碑式的功能:两步提交的sink function(TwoPhaseCommitSinkFunction,relevant Jira here)。TwoPhaseCommitSinkFunction 就是把最后写入存储的逻辑分为两部提交,这样就有可能构建一个从数据源到数据输出的一个端到端的exactly-once语义的flink应用。当然,TwoPhaseCommitSinkFunction的数据输出包括apache kafka 0.11以上的版本。flink提供了一个抽象的TwoPhaseCommitSinkFunction类,来让开发者用更少的代码来实现端到端的exactly-once语义。

Flink的 checkpoint在保证exactly-once内部应用exactly-once,不需要重复计算等
Flink是通过两步提交协议来保证从数据源到数据输出的exactly-once语义(外部)

接下来,我们通过一个例子来解释如果应用TwoPhaseCommitSinkFunction来实现一个exactly-oncesink

二、Exactly-once Tow Phase Commit

下面我们来看看flink消费并写入kafka的例子是如何通过两部提交来保证exactly-once语义的。

注意: 因为只有kafka0.11开始支持事物操作,若要使用flink端到端exactly-once语义需要flinksinkkafka0.11版本以上的。 同时 DELL/EMCPravega 也支持使用flink来保证端到端的exactly-once语义。

这个例子包括以下几个步骤:

  • kafka读取数据
  • 一个聚合窗操作
  • 向kafka写入数据

为了保证exactly-once,所有写入kafka的操作必须是事务的。在两次checkpiont之间要批量提交数据,这样在任务失败后就可以将没有提交的数据回滚。

然而一个简单的提交和回滚,对于一个分布式的流式数据处理系统来说是远远不够的。下面我们来看看flink是如何解决这个问题的。

Flink官方推荐所有需要保证exactly once的Sink逻辑都继承该抽象类。它定义了如下4个抽象方法,需要子类实现。

	// 开始一个事务,返回事务信息的句柄。
    protected abstract TXN beginTransaction() throws Exception;
	// 预提交(即提交请求)阶段的逻辑。
    protected abstract void preCommit(TXN transaction) throws Exception;
	// 正式提交阶段的逻辑。
    protected abstract void commit(TXN transaction);
	// 取消事务。
    protected abstract void abort(TXN transaction);

2.1、预提交 (preCommit)

首先我们看下 preCommit 代码实现。

    @Override
    protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction)
            throws FlinkKafkaException 
        switch (semantic) 
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(transaction); // 实际上是代理了KafkaProducer.flush()方法。
                break;
			// .....
        
    

preCommitTwoPhaseCommitSinkFunction#snapshotState()中调用

TwoPhaseCommitSinkFunction也继承了CheckpointedFunction接口,所以2PC是与检查点机制一同发挥作用的。

每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

2.2、提交阶段

    @Override
    protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) 
        if (transaction.isTransactional()) 
            try 
            	// 实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。
                transaction.producer.commitTransaction();
             finally 
                recycleTransactionalProducer(transaction.producer);
            
        
    

该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。

该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。

参考

https://www.aboutyun.com/forum.php?mod=viewthread&tid=27395
https://dandelioncloud.cn/article/details/1441622512370266113
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

以上是关于Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction的主要内容,如果未能解决你的问题,请参考以下文章

使用Flink进行高吞吐,低延迟和Exactly-Once语义流处理

Flink 容错机制 Checkpoint 生成与恢复流程

Flink——Exactly-Once

Flink Exactly-Once 投递实现浅析

Flink Exactly-Once 投递实现浅析

Flink Exactly-Once 投递实现浅析