端到端的精确一次保证

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了端到端的精确一次保证相关的知识,希望对你有一定的参考价值。

参考技术A Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?

端对端 exactly-once 有 3 个条件:

⭐ Source 引擎可以重新消费,比如 Kafka 可以重置 offset 进行重新消费

⭐ Flink 任务配置 exactly-once,保证 Flink 任务 State 的 exactly-once

⭐ Sink 算子支持两阶段或者可重入,保证产出结果的 exactly-once

其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:

⭐ Sink 两阶段:由于两阶段提交是随着 Checkpoint 进行的,假设 Checkpoint 是 5min 做一次,那么数据对下游消费方的可见性延迟至少也是 5min,所以会有数据延迟等问题,目前用的比较少。

⭐ Sink 支持可重入:举例:

⭐ Sink 为 mysql:可以按照 key update 数据

⭐ Sink 为 Druid:聚合类型可以选用 longMax

⭐ Sink 为 ClickHouse:查询时使用 longMax 或者使用 ReplacingMergeTree 表引擎将重复写入的数据去重,这里有小伙伴会担心 ReplacingMergeTree 会有性能问题,但是博主认为其实性能影响不会很大,因为 failover 导致的数据重复其实一般情况下是小概率事件,并且重复的数据量也不会很大,也只是一个 Checkpoint 周期内的数据重复,所以使用 ReplacingMergeTree 是可以接受的)

⭐ Sink 为 Redis:按照 key 更新数据

其他解答:Flink状态一致性、端到端的精确一次保证

状态一致性:当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?       对于流处理内部来说,所谓的状态一致性,其实就是我们所说的计算结果保证准确。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。    一条数据不应该丢失,也不应该重复计算

一致性可以分为 3 个级别: at-most-once(最多一次):计数结果可能丢失

at-least-once (至少一次):计数程序在发生故障后可能多算,但是绝不会少算。

exactly-once (精确一次):系统保证在发生故障后得到的计数结果与正确值一致。

数据流(DataStream)内部保证exactly-once (精确一次)的方法:Flink 使用了一种轻量级快照机制 ---- 检查点(checkpoint)来保证 exactly-once 语义

有状态应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

端到端保证一致性:

内部保证  —— 依赖 checkpoint

source 端  —— 需要外部源可重设数据的读取位置

sink 端  —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。

幂等操作:是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 例如Hashmap 的写入插入操作是幂等的操作,重复写入,写入的结果还一样。

事务写入:构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中

对于事务性写入,具体又有两种实现方式: 预写日志(WAL)和两阶段提交(2PC) 。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。其中 预写日志(WAL)只能保证至少一次精确。

Flink+Kafka 端到端状态一致性的保证

内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复, 保证内部的状态一致性

source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后 续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据, 保证一致性

sink —— kafka producer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

由于端到端保证一致性需要用到两阶段提交(2PC)TwoPhaseCommitSinkFunction,我们来了解一下两阶段提交的方式:

第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager

sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

外部 kafka 关闭事务,提交的数据可以正常消费了。

我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的操作,之于有关后端状态的选择,后面再单独聊聊

flink如何利用checkpoint保证数据状态一致性

flink数据状态一致性

1状态一致性级别

1.1 AT-MOST-ONCE (最多一次):

  • 这本质上是一『尽力而为』的方法。保证数据或事件最多由应用程序中的所有算子处理一次。 这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送。下图中的例子说明了这种情况。

1.2 AT-LEAST-ONCE (至少一次):

  • 应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失,则将从源头重放或重新传输事件。然而,由于事件是可以被重传的,因此一个事件有时会被处理多次,这就是所谓的至少一次。

下图的例子描述了这种情况:第一个算子最初未能成功处理事件,然后在重试时成功,接着在第二次重试时也成功了,其实是没有必要的。

1.3 EXACTLY-ONCE (精确一次):

即使是在各种故障的情况下,流应用程序中的所有算子都保证事件只会被『精确一次』的处理。(也有文章将 Exactly-once 翻译为:完全一次,恰好一次)

通常使用两种流行的机制来实现『精确一次』处理语义。

  • 分布式快照 / 状态检查点
  • 至少一次事件传递和对重复数据去重

实现『精确一次』的分布式快照/状态检查点方法受到 Chandy-Lamport 分布式快照算法的启发[1]。通过这种机制,流应用程序中每个算子的所有状态都会定期做 checkpoint。如果是在系统中的任何地方发生失败,每个算子的所有状态都回滚到最新的全局一致 checkpoint 点。在回滚期间,将暂停所有处理。源也会重置为与最近 checkpoint 相对应的正确偏移量。整个流应用程序基本上是回到最近一次的一致状态,然后程序可以从该状态重新启动。下图描述了这种 checkpoint 机制的基础知识。


在上图中,流应用程序在 T1 时间处正常工作,并且做了checkpoint。然而,在时间 T2,算子未能处理输入的数据。此时,S=4 的状态值已保存到持久存储器中,而状态值 S=12 保存在算子的内存中。为了修复这种差异,在时间 T3,处理程序将状态回滚到 S=4 并“重放”流中的每个连续状态直到最近,并处理每个数据。最终结果是有些数据已被处理了多次,但这没关系,因为无论执行了多少次回滚,结果状态都是相同的。

另一种实现『精确一次』的方法是:在每个算子上实现至少一次事件传递和对重复数据去重来。使用此方法的流处理引擎将重放失败事件,以便在事件进入算子中的用户定义逻辑之前,进一步尝试处理并移除每个算子的重复事件。此机制要求为每个算子维护一个事务日志,以跟踪它已处理的事件。利用这种机制的引擎有 Google 的 MillWheel[2] 和 Apache Kafka Streams。下图说明了这种机制的要点。

这里所说的的状态一致性是flink系统内部的状态,如果要保证端到端,从接受到发出数据都保障一致性,还需要其他系统能力支持,这部分后面说。

绝大多数情况我们会希望exactly-once,但相比at-least-once,exactly-once的性能与速度会相对较慢一点,这是由于checkpoint的机制造成的。

我们主要关注的是精确一致性,所以在这里我们也只讲精确一致性相关的概念。

我们所说的flink内的精确一致性,真的是精确的只发送一次数据么?

现在让我们重新审视『精确一次』处理语义真正对最终用户的保证。『精确一次』这个术语在描述正好处理一次时会让人产生误导。

有些人可能认为『精确一次』描述了事件处理的保证,其中流中的每个事件只被处理一次。实际上,没有引擎能够保证正好只处理一次。在面对任意故障时,不可能保证每个算子中的用户定义逻辑在每个事件中只执行一次,因为用户代码被部分执行的可能性是永远存在的。

那么,当引擎声明『精确一次』处理语义时,它们能保证什么呢?如果不能保证用户逻辑只执行一次,那么什么逻辑只执行一次?当引擎声明『精确一次』处理语义时,它们实际上是在说,它们可以保证引擎管理的状态更新只提交一次到持久的后端存储。

上面描述的两种机制都使用持久的后端存储作为真实性的来源,可以保存每个算子的状态并自动向其提交更新。对于机制 1 (分布式快照 / 状态检查点),此持久后端状态用于保存流应用程序的全局一致状态检查点(每个算子的检查点状态)。对于机制 2 (至少一次事件传递加上重复数据删除),持久后端状态用于存储每个算子的状态以及每个算子的事务日志,该日志跟踪它已经完全处理的所有事件。

提交状态或对作为真实来源的持久后端应用更新可以被描述为恰好发生一次。然而,如上所述,计算状态的更新 / 更改,即处理在事件上执行任意用户定义逻辑的事件,如果发生故障,则可能不止一次地发生。换句话说,事件的处理可以发生多次,但是该处理的效果只在持久后端状态存储中反映一次。因此,我们认为有效地描述这些处理语义最好的术语是『有效一次』(effectively once)。

1.4 分布式快照与至少一次事件传递和重复数据删除的比较

从语义的角度来看,分布式快照和至少一次事件传递以及重复数据删除机制都提供了相同的保证。然而,由于两种机制之间的实现差异,存在显着的性能差异。

  • 机制 1(分布式快照 / 状态检查点)的性能开销是最小的,因为引擎实际上是往流应用程序中的所有算子一起发送常规事件和特殊事件,而状态检查点可以在后台异步执行。但是,对于大型流应用程序,故障可能会更频繁地发生,导致引擎需要暂停应用程序并回滚所有算子的状态,这反过来又会影响性能。流式应用程序越大,故障发生的可能性就越大,因此也越频繁,反过来,流式应用程序的性能受到的影响也就越大。然而,这种机制是非侵入性的,运行时需要的额外资源影响很小。

  • 机制 2(至少一次事件传递加重复数据删除)可能需要更多资源,尤其是存储。使用此机制,引擎需要能够跟踪每个算子实例已完全处理的每个元组,以执行重复数据删除,以及为每个事件执行重复数据删除本身。这意味着需要跟踪大量的数据,尤其是在流应用程序很大或者有许多应用程序在运行的情况下。执行重复数据删除的每个算子上的每个事件都会产生性能开销。但是,使用这种机制,流应用程序的性能不太可能受到应用程序大小的影响。对于机制 1,如果任何算子发生故障,则需要发生全局暂停和状态回滚;对于机制 2,失败的影响更加局部性。当在算子中发生故障时,可能尚未完全处理的事件仅从上游源重放/重传。性能影响与流应用程序中发生故障的位置是隔离的,并且对流应用程序中其他算子的性能几乎没有影响。从性能角度来看,这两种机制的优缺点如下。

分布式快照 / 状态检查点的优缺点:

  • 优点:
    • 较小的性能和资源开销
  • 缺点:
    • 对性能的影响较大
    • 拓扑越大,对性能的潜在影响越大

至少一次事件传递以及重复数据删除机制的优缺点:

  • 优点:
    • 故障对性能的影响是局部的
    • 故障的影响不一定会随着拓扑的大小而增加
  • 缺点:
    • 可能需要大量的存储和基础设施来支持
    • 每个算子的每个事件的性能开销

虽然从理论上讲,分布式快照和至少一次事件传递加重复数据删除机制之间存在差异,但两者都可以简化为至少一次处理加幂等性。对于这两种机制,当发生故障时(至少实现一次),事件将被重放/重传,并且通过状态回滚或事件重复数据删除,算子在更新内部管理状态时本质上是幂等的。

2flink内部实现状态一致性

从我另一篇文章
Flink checkpoint具体操作流程详解与报错调试方法汇总

中有详细的阐述了,flink内checkpoint的执行流程,这里就不展开讲了,对这部分知识有了解的读者应该清楚,节点做checkpoint的快照部分前是同步的,也就是,这个节点会等待所有上游并发节点的 checkpoint barrier全部到来才会发出做快照的命令,之后才是异步做快照的阶段。

在我们程序处理中通常要求能够满足Exactly once语义,保证数据的准确性,flink 通过checkpoint机制提供了Exactly-Once与At-Least-Once 两种不同的消费语义实现, 可以将程序处理的所有数据都保存在状态内部,当程序发生异常失败重启可以从最近一次成功checkpoint中恢复状态数据,通过checkpoint中barrier对齐机制来实现这两不同的语义,barrier对齐发生在一个处理节点需要接收上游不同处理节点的数据,由于不同的上游节点数据处理速度不一致,那么就会导致下游节点接收到 barrier的时间点也会不一致,这时候就需要使用barrier对齐机制:在同一checkpoint中,先到达的barrier是否需要等待其他处理节点barrier达到后在发送后续数据,barrier将数据流分为前后两个checkpoint(chk n,chk n+1)的概念,如果不等待那么就会导致chk n的阶段处理了chk n+1阶段的数据,但是在source端所记录的消费偏移量又一致,如果chk n成功之后,后续的任务处理失败,任务重启会消费chk n+1阶段数据,就会到致数据重复消息,如果barrier等待就不会出现这样情况,因此barrier需要对齐那么就是实现Exactly once语义,否则实现的是at least once语义。由于状态是属于flink内部存储,所以flink 仅仅满足内部Exactly once语义。

至此实现了flink系统内部的Exactly once语义。

3 端到端的一致性

端到端的数据一致性,主要分三部分

  • source
  • flink内部:这部分由flink内部实现,这节就不提了
  • sink

3.1 Source

需要外部源支持可重设数据的读取位置,例如kafka,或增量保存数据的数据源,自己记录offset,例如 mysql 记录消费到了 多少的id。 kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

3.2 Sink

sink端要保证,任务从故障恢复时,数据不会重新写入到持久化存储中。一半包括两种情况:幂等写入,事务写入

3.2.1 幂等写入

所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。这种一般用于,下游的持久化存储没有事务支持的情况,例如redis。

这种情况一般的数据格式可以支持重复输出,例如统计关注的 uid: uid,重复输出也不会影响数据的准确性

3.2.2 事务写入

事务写入需要下游系统支持事务,例如kafka,mysql等。利用flink的两阶段提交,来实现数据的exactly-once.

3.2.2.1 两阶段提交

在分布式系统中,可以使用两阶段提交来实现事务性从而保证数据的一致性,两阶段提交分为:预提交阶段与提交阶段,通常包含两个角色:协调者与执行者,协调者用于用于管理所有执行者的操作,执行者用于执行具体的提交操作,具体的操作流程:

  1. 首先协调者会送预提交(pre-commit)命令有的执行者
  2. 执行者执行预提交操作然后发送一条反馈(ack)消息给协调者
  3. 待协调者收到所有执行者的成功反馈,则发送一条提交信息(commit)给执行者
  4. 执行者执行提交操作

如果在流程2中部分预提交失败,那么协调者就会收到一条失败的反馈,则会发送一条rollback消息给所有执行者,执行回滚操作,保证数据一致性;但是如果在流程4中,出现部分提交成功部分提交失败,那么就会造成数据的不一致,因此后面也提出了3PC或者通过其他补偿机制来保证数据最终一致性,接下看看flink 是如何做到2PC,保证数据的一致性。

3.2.2.2 flink的两阶段提交

以sink kafka为例,flink两阶段提交步骤详解:

  1. 第一条数据来了之后,开启一个kafka的事务(transaction) ,正常写入kafka分区日志但标记为未提交,这就是”预提交’
  2. jobmanager 触发checkpoint操作,barrier 从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知jobmanager
  3. sink 连接器收到barrier,保存当前状态,存入checkpoint,通知jobmanager, 并开启下一阶段的事务,用于提交下个检查点的数据
  4. jobmanager 收到所有任务的通知,发出确认信息,表示checkpoint 完成
  5. sink任务收到jobmanager的确认信息,正式提交这段时间的数据
    外部kafka关闭事务,提交的数据可以正常消费了。

如果在这期间出现任何的数据问题,flink都会回滚数据,之前预提交的数据不会被正式写入到kafka中,但如果没有问题,也只需要提交一个事务,sink kafka的下游就可以正常消费,sink算子不能存数据,这样的话,数据即发到了下游,有没有被消费到,出问题又可以回滚,但如果是redis,下游数据已经写入到存储中, 就上flink回滚,写入的数据也无法撤回,这就是两阶段提交的重要性。

个人理解,如果用两阶段提交,数据的实时性就没有那么高,因为需要根据checkpoint的时间间隔来一批一批的写入数据,没有办法每条数据都及时的处理完并sink出,所以对实时性有要求的,可以自定义实现幂等性sink来保证数据的一致性。

引用:
https://mp.weixin.qq.com/s/hFtPx6PSyobt6Pm6XvUrHg
https://blog.csdn.net/wypblog/article/details/103900577

以上是关于端到端的精确一次保证的主要内容,如果未能解决你的问题,请参考以下文章

干货:Flink+Kafka 0.11端到端精确一次处理语义实现

译Flink + Kafka 0.11端到端精确一次处理语义的实现

端到端E2E

28是否要使用端到端的深度学习?

Kafka端到端审计

通信系统物理层的端到端优化方法