基于Flink的实时数据同步原理
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Flink的实时数据同步原理相关的知识,希望对你有一定的参考价值。
数据同步用一个简单的模型可以描述为源端(Source)到目标端(Sink)的数据复制过程。源端通常是数据库比如mysql、目标端通常是分布式存储系统如HDFS等,在源端和目标端有时需要进行一些数据变换,如下图:
这个过程如果按照同步方式One by One地从源库read记录,然后write目标库,因为是单条记录,很容易控制写入的原子性事务:要么写成功,要么写失败,而且源端read的位置也不会丢失。这种同步方式的最大问题是效率低,因为源端和目标端系统的性能不一致,整体效率取决于较慢的一端,为了提高读写效率和吞吐量,需要将Source和Sink解耦,如基于内存Channel或者基于消息系统Kafka(Kafka本身也是Sink)的生产者和消费者模式,于是上述过程变成如下方式:
基于内存Channel的方式,当同步过程中出现读写异常或者宕机,会导致应用程序出错和产生源端和目标端不一致的脏数据,而且通常无法恢复,因为没有持久化同步过程中的状态信息,比如Datax、Sqoop等采用类似方式。还有一种基于内存Channel的实现框架是Debezium,其定位为支持多种数据库系统的实时数据捕获:
相比现有的Datax和Sqoop等,它有两大点显著性优势:实时和故障可恢复,基于Debezium的数据同步模型为:
先说实时:Source如何实时读取binlog?
从上图可知,Mysql一次主从复制需要有三个线程来实现,其中一个线程(Binlog dump thread)在主服务器上,其它两个线程(Slave I/O thread , Slave SQL thread)在从服务器上。如果一台主服务器配两台从服务器,那主服务器上就会有两个Binlog dump 线程,而每个从服务器上各自有两个线程。
Binlog dump 线程是当有从服务器连接的时候由主服务器创建,用于向Slave发送binlog日志数据,Slave I/O 线程去连接主服务器的Binlog dump 线程并要求其发送binlog日志中记录的更新操作,然后它将Binlog dump 线程发送的数据拷贝到从服务器上的本地文件relay log中。Slave SQL线程是在从服务器上面创建的,主要负责读取由Slave I/O写的relay log文件并执行其中的事件。Slave跟Master创建连接并发送dump指令之后,两端的处理流程为:
Master端:
启动Dump Thread根据Slave发送过来需要dump的文件名和位置从读取binlog数据并发送给Slave的IO Thread。当没有更新日志,该线程将阻塞,直到有新日志事件到来将其唤醒。
Slave端:
IO Thread将接收到的日志Event写入本地relay文件,然后SQL Thread读取relay日志数据将数据反映到Slave数据库中。
从交互流程中可知,从库主动从主库请求 binlog 的副本,而不是主库主动将数据推送到从库。也就是说每个从库都是独立地与主库进行连接,每个从库只能通过向主库请求来接收 binlog 的副本,因此从库能够以自己的速度读取和更新数据库的副本,并且可以随意启动和停止该过程,而不会影响到主库或者其他从库的状态。以下代码(有删减)摘录自debezium项目中跟binlog交互过程:
//debezium#BinaryLogClient.java
public void connect() throws IOException, IllegalStateException {
try{
this.channel = this.openChannel();
GreetingPacket greetingPacket = this.receiveGreeting();
(new Authenticator(greetingPacket, this.channel, this.schema, this.username, this.password)).authenticate();
this.tryUpgradeToSSL(greetingPacket);
this.fetchBinlogFilenameAndPosition();
this.enableHeartbeat();
this.requestBinaryLogStream();
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
}
this.listenForEventPackets();
}finally{
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
}
}
}
private void listenForEventPackets() throws IOException {
ByteArrayInputStream inputStream = this.channel.getInputStream();
while(inputStream.peek() != -1) {
int packetLength = inputStream.readInteger(3);
inputStream.skip(1L);
int marker = inputStream.read();
if (marker == 254 && !this.blocking) {
completeShutdown = true;
break;
}
Event event;
try {
event = this.eventDeserializer.nextEvent(packetLength == 16777215 ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
this.notifyEventListeners(event);
this.updateClientBinlogFilenameAndPosition(event);
}finally {
this.disconnect();
}
}
}
}
再看看Debezium如何做故障恢复,而解释怎么做故障恢复,需要先了解其怎么表示进度信息。Source是Debezium的主要特征,基于Kafka Connector API对接上游源系统,比如Mysql、Sql Server、Postgrel和Mongodb,下游对接Kafka。其中Kafka是可选项,Debezium允许通过Embedding方式集成到应用程序,使用示例为:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = config.asProperties();
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.name", "my-app-connector");
props.setProperty("database.history",
"io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename",
"/path/to/storage/dbhistory.dat");
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
System.out.println(record);
}).build()) {
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
// Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished
Debezium支持不同的Offset持久化方式:内存持久化和文件持久化。示例代码中配置的是基于文件的持久化:
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
其作用是将上游系统的binlog位置偏移信息保存下来,当程序异常恢复时,能基于原来的位置继续读取,避免出现数据丢失。何为偏移量?Debezium定义了一个DebeziumOffset这样的数据结构,官方解释为:
This class describes the most basic state that Debezium used for recovering based on Kafka Connect mechanism. It includes a sourcePartition and sourceOffset.
The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used to resume consumption of data.
主要包含两部分信息:sourcePartition和sourceOffset,两者都是Map结构,前者表示一个分区,主要是为了兼容Kafka的存储逻辑结构,在使用时候,常常对应一个固定值,比如对于Mysql数据库,partition为mysql_binlog_source,offset为binlog的文件名和pos位置以及其他可选信息,如下图:
而MS Server的offset则为这样的:
DebeziumOffset的持久化支持两者策略,AlwaysCommitOffsetPolicy和PeriodicCommitOffsetPolicy,前者每次同步binlog都进行持久化,此举是尽可能频繁地保存binlog的进度信息,后者为周期性地保存binlog进度信息,持久化频率越高,对系统性能损耗越大,因此需要结合实际情况,合理设置持久时间间隔,示例中默认1分钟一次:
props.setProperty("offset.flush.interval.ms", "60000");
接下来看看进度信息在什么情况下触发保存动作:在Debezium中,EmbeddedEngine是一个线程,其主要功能是在应用程序中,对接上游binlog数据源,在应用程序中通过自定义Consumer处理changed的,通过SourceRecord包装后的数据记录。应用程序负责所有有关容错、扩容以及如何存储上游数据消费位置和历史数据等相关问题。
#EmbeddedEngine.java
@Override
public void run() {
// Initialize the offset store ...
offsetStore.configure(workerConfig);
offsetStore.start();
RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);
try{
while (runningThread.get() != null) {
List<SourceRecord> changeRecords = null;
changeRecords = task.poll();
changeRecords = changeRecords.stream()
.map(transformations::transform)
.filter(x -> x != null)
.collect(Collectors.toList());
handler.handleBatch(changeRecords, committer);
}
}finally {
commitOffsets(offsetWriter, commitTimeout, task);
}
}
其中handler是Debezium的默认实现:
private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {
return new ChangeConsumer() {
/**
* the default implementation that is compatible with the old Consumer api.
*
* On every record, it calls the consumer, and then only marks the record
* as processed when accept returns, additionally, it handles StopConnectorExceptions
* and ensures that we all ways try and mark a batch as finished, even with exceptions
* @param records the records to be processed
* @param committer the committer that indicates to the system that we are finished
*
* @throws Exception
*/
@Override
public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
for (SourceRecord record : records) {
try {
consumer.accept(record);
committer.markProcessed(record);
}
catch (StopConnectorException | StopEngineException ex) {
// ensure that we mark the record as finished
// in this case
committer.markProcessed(record);
throw ex;
}
}
committer.markBatchFinished();
}
};
}
@Override
public synchronized void markBatchFinished() throws InterruptedException {
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);
}
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,
SourceTask task)
throws InterruptedException {
// Determine if we need to commit to offset storage ...
long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;
if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
commitOffsets(offsetWriter, commitTimeout, task);
}
}
在maybeFlush方法中依据配置的持久化策略和持久化介质进行持久化。有了持久化,下一个问题是如何恢复持久化的位置信息?
在EmbeddedEngine的run方法中
// Initialize the offset store ...
offsetStore.configure(workerConfig);
offsetStore.start();
在引擎启动过程中初始化offsetStore并恢复数据,OffsetBackingStore是一个支持后端数据存储的接口,有3个实现实例:
比如基于文件的存储方式,在start中加载文件,恢复位置数据:
Debezium虽然能够根据消费位置从异常中恢复,但是存在数据重复处理的问题,特别是当有下游写入的时候,重复的数据又会写入下游系统。所以在应用程序中使用Debezium存在的两大问题是上游数据重复处理、下游数据重复写入。特别是如果处理支持并发,重复问题会更严重。
基于kafka方式,无论是从Source写入Kafka端还是从Sink端读取Kafka,都可以利用Kafka的Ack机制保证数据的服务质量(QoS)。为了达到读写效率最大化,生产端和消费端都支持批量Ack,但是Kafka生产端和消费端的Ack机制有所不同:消费端只有消费成功才可以Ack,而生产端提供3种Ack方式:
0 - 不保证数据成功投递,最多发送一次;
1 - 只要Kafka leader副本写入成功,即Ack;
-1 - 当Kafka全部副本写入成功才Ack
应用Kafka作为中间缓存,增大了数据存储开销和系统运维成本。另外,采用Kafka作为缓冲隐藏了很多问题,比如Source通过什么服务等级写入Kafka,Source如何维护数据位置等状态信息,采用Kafka作为缓存的框架有Canal等。同时下游消费Kafka同样需要维护Offset,Kafka的生产者又需要维护binlog的offset,维护两道offset门槛维护加强了系统脆弱性和故障率,基于Flink和Debezium的方案能够优化掉Kafka,如果此时仅仅作为一种Sink选项,将Kafka作为我们的Sink(当然还可以用其他的Sink,比如Mysql),此时的模型图如下:
相比基于Kafka作为缓存,同时又作为Sink,减少了一道维护Offset的门槛,如果换做其他类型的Sink,此时完全可以摆脱对Kafka的依赖。而且此时的模型中,数据库的binlog作为Source,Kafka作为Sink,Source、Transformation和Sink都可以被集成到一个框架内如Flink Connector,下面着重来看该方案如何解决一些关键问题的。
Flink如何提高数据同步并发性能?
单纯使用Debezium不容易实现扩容方案,而借助Flink可以让Debezium如虎添翼。在介绍如何实现扩容前,先了解下Flink的Job调度模型。
在Flink中,Job的拓扑结构有两种视图:Job逻辑视图和Job执行视图。如下图所示,一个作业逻辑:soure=>map=>keyBy|window|apply=>sink,其中keyBy|window|apply组成一个管道,在一个任务中运行。
要提高Flink应用程序的并行度,可以借助slot和parallelism这两个概念来实现。其中slot是资源组的抽象,是资源分配的单位,(pipelined)任务就运行在slot里面,slot代表了taskmanager的并发执行能力,比如下图中配置了每个TaskManager的slot为3,那么3个taskmanager一共有9个TaskSlot,意味着该Flink集群最大并行任务数量为9:
任务的并行度可以通过parallelism来设置,最大不能超过slot数量。运行程序默认的并行度为1,比如下面图中,9个TaskSlot只用了1个,有8个空闲,因此只有设置合适的并行度才能提高集群的利用率。
parallelism是可配置、可指定的,下面列举了常用方法:
1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度
2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度
3.可以通过设置executionEnvironmentk的方法修改并行度
4.可以通过设置flink的编程API修改过并行度
5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.
6.设置合适的并行度,能提高运算效率
7.parallelism不能多于slot个数。
回到我们实时数据同步的场景中,原来单纯基于Debezium的模型,在并行模式下,模型变成下面的样子:
图中source的默认并行度为1,因为对数据库binlog并行支持,不同的数据库版本支持的并行策略不一样,来看看Mysql系列不同数据库版本的并行复制技术:
Mysql 5.6 按库并行复制,不同的库binlog互不影响,可以并行读取;
MariaDB 支持组提交的并行优化技术,因为一组提交的事务有一个相同的 commit_id 直接写到 binlog 里,能够在同一组里提交的事务一定不会修改同一行;
MySQL 5.7 根据同时进入 prepare 和 commit 来判断是否可以并行的策略,其依据的原则为:
同时处于prepare状态的事务,在备库执行时是可以并行的;
处于prepare状态的事务与处于commit状态之间的事务,在备库执行时也是可以并行的。
binlog_group_commit_sync_delay 和 bin_log_group_commit_sync_no_delay_count 这两个参数可以用来拉长 binlog write 和 fsync 之间的时间,以此减少binlog的写盘次数,制造更多同时处于 prepare 阶段的事务,从而增加备库复制的并行度。前者表示延迟多少微秒后才调用fsync,后者表示累积多少次以后才调用fsync。
MySQL 5.7.22 基于WRITESET的并行复制,增加了一个参数binlog-transaction-dependency-tracking,用来控制是否启用这个策略。这个参数的可选值有以下3种:
COMMIT_ORDER,表示的就是前面介绍的,根据同时进入prepare和commit来判断是否可以并行的策略。
WRITESET,表示的是对于事务涉及更新的每一行,计算出这一行的hash值,组成集合writeset。如果两个事务没有操作相同的行,也就是说它们的writeset没有交集,就可以并行。
WRITESET_SESSION,是在WRITESET的基础上多了一个约束,即在主库上同一个线程先后执行的两个事务,在备库执行的时候,要保证相同的先后顺序。
Flink如何保证数据一致性?
在上图中,有3个地方会存在异常导致的数据丢失和重复问题:Source端读取binlog时发生异常、Flink中间处理过程中发生异常\\Sink端写入持久化时发生异常,为了解决上述问题,Flink引入了Checkpoint机制,所谓Checkpoint就是定时打快照,通过JobManager的CheckPointCoordinator向所有SourceTask 注入barrier,然后SourceTask再向下游广播barrier,一直到SinkTask接收到barrier,这个过程当中任意一个Task收到barrier并完成本地快照后向CheckPointCoordinator注册备份数据的地址(state handle),当CheckPointCoordinator收到所有Task的数据备份地址,一次分布式快照才算完成。Flink采用的是轻量级分布式快照算法,步骤为:
Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint,在数据流中安插CheckPoint barrier。
source 节点收到barrier后向下游广播 barrier,并将自己的状态(异步)写入到持久化存储中。当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。
当下游Task收到所有的barrier后,继续向自己的下游传递barrier,然后自身执行快照,并将自己的状态异步写入到持久化存储中。如果下游任务为Sink节点,barrier传递终止。
当 Checkpoint coordinator 收集齐所有 task 的 state handle,这一次的 Checkpoint 全局完成了,并向持久化存储中备份Checkpoint meta 文件,每次checkpoint生成不同的元数据文件目录。
下图展示一次checkpoint过程完成后保存的算子状态:
如果CheckPoint的持续时长超过 了CheckPoint设定的超时时间,CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator就会认为本次CheckPoint失败。下面的代码展示了有关Checpoint的配置属性:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//Sets the checkpointing mode (exactly-once vs. at-least-once).
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//Sets the interval in which checkpoints are periodically scheduled.
checkpointConfig.setCheckpointInterval(flinkConfig.interval);
// 两个checkpoints之间最少间隔
checkpointConfig.setMinPauseBetweenCheckpoints(flinkConfig.pauseInterval);
checkpointConfig.setTolerableCheckpointFailureNumber(10);
checkpointConfig.setCheckpointTimeout(flinkConfig.checkPointTimeout);
// env.setStateBackend(new FsStateBackend(flinkConfig.checkPointDir));
checkpointConfig.setCheckpointStorage(flinkConfig.checkPointDir);
// 取消作业时是否保留 checkpoint (默认不保留)
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Flink框架本身支持Operator级别的State快照,所谓State就是Flink处理过程中的状态数据,有keyed state和operator state两种,前者应用于 KeyedStream的函数与操作中,后者与一个 operator 的实例绑定。 这些状态信息通过定时生成的Checkpoint保存到内存或者持久化系统中,当任务失败重试时从持久化系统中恢复数据。Flink的State后端存储持久化方式有3种:MemoryStateBackend 、FsStateBackend和RocksDBStateBackend,其中 MemoryStateBackend 和 FsStateBackend 在运行时都是存储在 java heap 中的,只有在执行 Checkpoint 时,FsStateBackend 才会将数据以文件格式持久化到远程存储上,而 RocksDBStateBackend 借用了 RocksDB(内存磁盘混合的 LSM DB)对 state 进行存储。Flink框架对CheckpointableKeyedStateBackend和OperatorStateBackend分别实现了快照功能,在AbstractStreamOperator 的snapshot方法中有代码为证:
streamOperator.snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(
snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
if (checkpointOptions.getCheckpointType().isSavepoint()) {
SnapshotStrategyRunner<KeyedStateHandle, ? extends FullSnapshotResources<?>>
snapshotRunner = prepareSavepoint(keyedStateBackend, closeableRegistry);
snapshotInProgress.setKeyedStateManagedFuture(
snapshotRunner.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
} else {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
}
除了Flink本身对上述两种State类型的快照存储,也支持自定义的快照执行逻辑,特别值得一提的是Flink对除了Source和Sink外的中间算子能够做到Exactly Once,Source和Sink需要外部系统本身的支持,自定义快照执行需要算子实现CheckpointedFunction和CheckpointListener接口,比如Debezium用来实现binlog读取的DebeziumSourceFunction,其部分实现代码为:
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
***/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
this.offsetState =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
for (byte[] serializedOffset : offsetState.get()) {
if (restoredOffsetState == null) {
restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
}
}
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to
* the function to ensure that all state is exposed by means previously offered through {@link
* FunctionInitializationContext} when the Function was initialized, or offered now by {@link
* FunctionSnapshotContext} itself.**
*/
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
offsetState.clear();
final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
byte[] currentState = fetcher.snapshotCurrentState();
offsetState.add(currentState);
pendingOffsetsToCommit.put(checkpointId, serializedOffset);
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
/**
* Notifies the listener that the checkpoint with the given {@code checkpointId} completed and
* was committed.
*/
@Override
public void notifyCheckpointComplete(long checkpointId) {
final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
changeConsumer.commitOffset(offset);
}
}
下面看看针对这3种情况,Flink如何解决的?
1. Source端读取binlog时发生异常
如果Debezium在初始化快照时无法进行Checkpoint,假如此时发生异常,待任务恢复时候会重新全表扫描数据,存在数据重复问题。如果Debezium在增量读取binlog时发生异常,由于Debezium 通过自定义持久化策略保存offset信息,如果发生异常,可以恢复到上一次快照持久化时的读取位置,此时不会存在丢失,只是从上一次offset处重新读取一遍数据。
2. Flink中间处理过程中发生异常
在Flink中间处理过程中发生异常恢复时,keyed state和operator state 从持久化系统中加载数据,恢复中间过程数据。对于继承AbstractUdfStreamOperator的函数算子也可以基于UDF自定义快照恢复。
3. Sink端写入持久化时发生异常
Sink端写入持久化时发生异常,在恢复时由于上游Source恢复时存在重复,写入到Sink也可能会导致重复,这取决于Sink端系统是否支持幂等或者事务。如果Sink支持事务,在写入过程发生异常,此时事务不会提交,如果Sink不支持事务,当任务恢复时,会导致数据重复。Kafka通过两阶段提交来保证事务性写入,下面详细介绍。
Flink-Kafka如何通过两阶段提交保证事务?
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
预写日志(Write-Ahead-Log,WAL)
把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知的时候,一次性写入 sink 系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么 sink 系统,都能用这种方式一批搞定
DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink
两阶段提交(Two-Phase-Commit,2PC)
对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
然后将这些数据写入外部的 sink 系统,但不提交它们 ----- 这时只是“预提交”
当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入
这种方式真正实现了 exactly-once ,它需要一个提供事务支持的外部 sink 系统。Flink 提供了 TwoPhaseCommitSinkFunction 接口。
TwoPhaseCommitSinkFunction 也实现了CheckpointedFunction和CheckpointListener接口,除了具有这两个接口中的方法外,额外增加了以下几个方法:
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
/**
* Method that starts a new transaction.
*
* @returnnewly created transaction.
*/
protected abstract TXN beginTransaction() throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to
* prepare the transaction for a commit that might happen in the future. After this point the
* transaction might still be aborted, but underlying implementation must ensure that commit
* calls on already pre committed transactions will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be restarted
* and {@linkTwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/** Abort a transaction. */
protected abstract void abort(TXN transaction);
}
然后看看这几个方法的调用逻辑:
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
FlinkKafkaProducer.KafkaTransactionState,
FlinkKafkaProducer.KafkaTransactionContext> {
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
state.clear();
state.add(
new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
commit(pendingTransaction.handle);
pendingTransactionIterator.remove();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
//执行恢复提交或Abort等操作
}
currentTransactionHolder = beginTransactionInternal();
}
}
下面以从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink这样一个场景,来描述两阶段事务提交的工作流程:
JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,Kafka Sink会向Kafka进行真正的事务Commit。
两阶段提交的缺点:
1.同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。
当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
2.单点故障。由于协调者的重要性,一旦协调者发生故障。
参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
3.数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。
而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。
可见,Flink作为实时数据处理的利器,一方面在伸缩性、容错性和一致性方面解放了开发人员,另一方面也承担了Sink下游存储系统应该承担的部分职责(一致性保障),将这些重任自己抗下,背负了过多的负担,然而并没有完全解决问题,还引入了复杂性和更多的性能问题。部分一致性问题仍然需要开发人员和上下游端系统来保证。从解决问题的途径来看,相对端系统来说,Flink只是在应用端打补丁,并没有触及到端系统,因此长远来看,问题的最终解决可能还需要上下游端系统来保障。
以上是关于基于Flink的实时数据同步原理的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi
Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi