Debezium connector是怎么在Kafka Connect中跑起来的?
Posted 青草地@~~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Debezium connector是怎么在Kafka Connect中跑起来的?相关的知识,希望对你有一定的参考价值。
主要脉络
Kafka Connect在启动的过程中,会加载connector,并读取 task config,开启task, 其中task config中有一key为”task.class”, 对于debezium mysql Connector, 它的值为”io.debezium.connector.mysql.MySqlConnectorTask”。Kafka Connect通过ClassLoader加载plugin目录下的MySqlConnectorTask类, 并newInstance创建该实例。当开启task时, 会执行MySqlConnectorTask的start方法, 该方法包含从采集snapshot到读取binlog的逻辑实现。
Kafka Connect和Debezium MysqlConnectorTask类直接相关的类
DistributedHerder类(startTask方法)
startTask方法调用Worker类的startTask方法。重要代码:
1)这段代码,会通过newInstance()创建MysqlConnectorTask类的实例:
final Task task = plugins.newTask(taskClass);
2)这段代码,通过ExecutorService的submit方法后,把workerTask线程转为可运行状态,即执行WorkerTask类的run方法:
executor.submit(workerTask);
WorkerTask抽象类(run方法) – 实施Runable接口
dorun() –>execute()
WorkerSourceTask类,从WorkerTask抽象类扩展而来
execute()方法中,重要的两行代码:
task.initialize(new WorkerSourceTaskContext(offsetReader));
task.start(taskConfig);
这行代码,和MysqlConnectorTask结合起来,即为执行至MysqlConnectorTask类的initialize(SourceTaskContext context)和start()方法。 其中initialize(SourceTaskContext context)为MysqlConnectorTask继承至父类SourceTask的方法。
其中WorkerSourceTaskContext类主要包含两个功能:a. 传参OffsetStorageReader至构造器,初始化WorkerSourceTaskContext类; b. 通过offsetStorageReader方法返回OffsetStorageReader对象。
Kafka Connect开启MySqlConnectorTask的整个链路是怎样的呢?
- bin/connect-distributed.sh
运行类:org.apache.kafka.connect.cli.ConnectDistributed
- org.apache.kafka.connect.cli.ConnectDistributed类位于connect.runtime模块下。依次实现如下的主要逻辑:
- 读取properties配置
- 读取plugin
- 创建Jetty Server
- 创建offsetBackingStore, 并进行设置:1. 从config中获取topic名称:offset.storage.topic, 这个offset对于debezium mysql, 存储了binlog这些信息: file/gtids/pos等。2. 创建topic: offset.storage.topic 3. 基于offset.storage.topic, 创建KafkaBaseLog对象, 该对象在start方法中创建线程,poll offset.storage.topic。
- 传入offsetBackingStore/pugin等参数, 创建Worker对象
- 创建StatusBackingStore,并进行设置: 1. 从config中获取topic名称: status.storage.topic 2. 创建topic: status.storage.topic 3. 基于status.storage.topic, 创建KafkaBaseLog对象
- 创建ConfigBackingStore,并进行设置: 1. 从config中获取topic名称: config.storage.topic 2. 创建topic: config.storage.topic 3. 基于config.storage.topic, 创建KafkaBaseLog对象
- 基于config/statusBackingStore/configBackingStore等,创建herder
connect.start()
说明:1. (herder.start())start DistributedHerder线程,使之进入可运行状态, 线程run方法的任务, 包含如下: - worker.start(), 内含offsetBackingStore.start(); - statusBackingStore.start(); - configBackingStore.start(); 2. start jetty server: rest.start(herder), 通过Jersey的ResourceConfig,创建REST API - 通过REST API, 可查看connectors; - 通过REST API, 可创建Connector; - 调用REST API, 会调用herder进行一系列操作。
MysqlConnectorTask类的关键代码
往链式ChainedReader类中添加SnapshotReader或BinlogReader
MysqlConnectorTask下的start()方法下的代码:
1. ChainedReader链式Reader中添加SnapshotReader:
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
if (!taskContext.getConnectorConfig().getSnapshotDelay().isZero())
// Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions
chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", taskContext.getConnectorConfig().getSnapshotDelay()));
chainedReaderBuilder.addReader(snapshotReader);
2. ChainedReader链式Reader中添加BinlogReader:
BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
chainedReaderBuilder.addReader(binlogReader);
3. 添加其他(未知TODO)Reader:
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(config,
taskContext,
getNewFilters(offsets, config),
serverIdGenerator);
MySqlTaskContext unifiedTaskContext = createAndStartTaskContext(config, getAllFilters(config));
// we aren't completing a snapshot, but we need to make sure the "snapshot" flag is false for this new context.
unifiedTaskContext.source().completeSnapshot();
BinlogReader unifiedBinlogReader = new BinlogReader("binlog",
unifiedTaskContext,
null,
serverIdGenerator.getConfiguredServerId());
ReconcilingBinlogReader reconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(unifiedBinlogReader);
chainedReaderBuilder.addReader(parallelSnapshotReader);
chainedReaderBuilder.addReader(reconcilingBinlogReader);
chainedReaderBuilder.addReader(unifiedBinlogReader)
链式Reader开始
MysqlConnectorTask下的start()方法中的这行代码:
this.readers.start();
这行代码,调用ChainedReader类的start方法, start方法中的startNextReader方法,将会执行SnapshotReader或BinlogReader类的start方法:
reader.start();
以SnapshotReader类为例, SnapshotReader类的父类为AbstractReader,AbstractReader的start方法:
public void start()
if (this.running.compareAndSet(false, true))
this.failure.set(null);
this.success.set(false);
doStart();
其中dostart()方法在SnapshotReader类的实施方法如下:
@Override
protected void doStart()
executorService = Threads.newSingleThreadExecutor(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "snapshot");
executorService.execute(this::execute);
其中this::execute方法,即为SnapshotReader类的execute方法, 执行snapshot,功能等同于mysqldump。
以上是关于Debezium connector是怎么在Kafka Connect中跑起来的?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库
Debezium SQL Server Source Connector 设置 Kafka 代理