Debezium connector是怎么在Kafka Connect中跑起来的?

Posted 青草地@~~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Debezium connector是怎么在Kafka Connect中跑起来的?相关的知识,希望对你有一定的参考价值。

主要脉络

Kafka Connect在启动的过程中,会加载connector,并读取 task config,开启task, 其中task config中有一key为”task.class”, 对于debezium mysql Connector, 它的值为”io.debezium.connector.mysql.MySqlConnectorTask”。Kafka Connect通过ClassLoader加载plugin目录下的MySqlConnectorTask类, 并newInstance创建该实例。当开启task时, 会执行MySqlConnectorTask的start方法, 该方法包含从采集snapshot到读取binlog的逻辑实现。

Kafka Connect和Debezium MysqlConnectorTask类直接相关的类

DistributedHerder类(startTask方法)

startTask方法调用Worker类的startTask方法。重要代码:

1)这段代码,会通过newInstance()创建MysqlConnectorTask类的实例:

final Task task = plugins.newTask(taskClass);

2)这段代码,通过ExecutorService的submit方法后,把workerTask线程转为可运行状态,即执行WorkerTask类的run方法:

executor.submit(workerTask);

WorkerTask抽象类(run方法) – 实施Runable接口

dorun() –>execute()

WorkerSourceTask类,从WorkerTask抽象类扩展而来

execute()方法中,重要的两行代码:

task.initialize(new WorkerSourceTaskContext(offsetReader));

task.start(taskConfig);

这行代码,和MysqlConnectorTask结合起来,即为执行至MysqlConnectorTask类的initialize(SourceTaskContext context)start()方法。 其中initialize(SourceTaskContext context)MysqlConnectorTask继承至父类SourceTask的方法。

其中WorkerSourceTaskContext类主要包含两个功能:a. 传参OffsetStorageReader至构造器,初始化WorkerSourceTaskContext类; b. 通过offsetStorageReader方法返回OffsetStorageReader对象。

Kafka Connect开启MySqlConnectorTask的整个链路是怎样的呢?

- bin/connect-distributed.sh

运行类:org.apache.kafka.connect.cli.ConnectDistributed

- org.apache.kafka.connect.cli.ConnectDistributed类位于connect.runtime模块下。依次实现如下的主要逻辑:

  • 读取properties配置
  • 读取plugin
  • 创建Jetty Server
  • 创建offsetBackingStore, 并进行设置:1. 从config中获取topic名称:offset.storage.topic, 这个offset对于debezium mysql, 存储了binlog这些信息: file/gtids/pos等。2. 创建topic: offset.storage.topic 3. 基于offset.storage.topic, 创建KafkaBaseLog对象, 该对象在start方法中创建线程,poll offset.storage.topic。
  • 传入offsetBackingStore/pugin等参数, 创建Worker对象
  • 创建StatusBackingStore,并进行设置: 1. config中获取topic名称: status.storage.topic 2. 创建topic: status.storage.topic 3. 基于status.storage.topic, 创建KafkaBaseLog对象
  •  创建ConfigBackingStore,并进行设置:  1. config中获取topic名称: config.storage.topic 2. 创建topic: config.storage.topic 3. 基于config.storage.topic, 创建KafkaBaseLog对象
  • 基于config/statusBackingStore/configBackingStore等,创建herder
​​​​​​​connect.start()

说明:1. (herder.start())start DistributedHerder线程,使之进入可运行状态, 线程run方法的任务, 包含如下: - worker.start(), 内含offsetBackingStore.start(); - statusBackingStore.start(); - configBackingStore.start(); 2. start jetty server: rest.start(herder), 通过JerseyResourceConfig,创建REST API    - 通过REST API, 可查看connectors;    - 通过REST API, 可创建Connector    - 调用REST API, 会调用herder进行一系列操作。

MysqlConnectorTask类的关键代码

往链式ChainedReader类中添加SnapshotReader或BinlogReader

MysqlConnectorTask下的start()方法下的代码:

1. ChainedReader链式Reader中添加SnapshotReader:

// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);

if
(!taskContext.getConnectorConfig().getSnapshotDelay().isZero())
   
// Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions
   
chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", taskContext.getConnectorConfig().getSnapshotDelay()));

chainedReaderBuilder.addReader(snapshotReader)
;

2. ChainedReader链式Reader中添加BinlogReader:

BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);

chainedReaderBuilder.addReader(binlogReader);

3. 添加其他(未知TODO)Reader:

ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(config,

        taskContext,

        getNewFilters(offsets, config),

        serverIdGenerator);



MySqlTaskContext unifiedTaskContext = createAndStartTaskContext(config, getAllFilters(config));

// we aren't completing a snapshot, but we need to make sure the "snapshot" flag is false for this new context.

unifiedTaskContext.source().completeSnapshot();

BinlogReader unifiedBinlogReader = new BinlogReader("binlog",

        unifiedTaskContext,

        null,

        serverIdGenerator.getConfiguredServerId());

ReconcilingBinlogReader reconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(unifiedBinlogReader);



chainedReaderBuilder.addReader(parallelSnapshotReader);

chainedReaderBuilder.addReader(reconcilingBinlogReader);

chainedReaderBuilder.addReader(unifiedBinlogReader)

链式Reader开始

MysqlConnectorTask下的start()方法中的这行代码:

this.readers.start();

这行代码,调用ChainedReader类的start方法, start方法中的startNextReader方法,将会执行SnapshotReader或BinlogReader类的start方法:

reader.start();

以SnapshotReader类为例, SnapshotReader类的父类为AbstractReader,AbstractReader的start方法:

public void start()
   
if (this.running.compareAndSet(false, true))
       
this.failure.set(null);
        this
.success.set(false);
       
doStart();
   

其中dostart()方法在SnapshotReader类的实施方法如下:

@Override
protected void doStart()
   
executorService = Threads.newSingleThreadExecutor(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "snapshot");
   
executorService.execute(this::execute);

其中this::execute方法,即为SnapshotReader类的execute方法, 执行snapshot,功能等同于mysqldump。

以上是关于Debezium connector是怎么在Kafka Connect中跑起来的?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库

debezium - 更改主题名称会导致错误跨数据库引用

Debezium SQL Server Source Connector 设置 Kafka 代理

meethigher-配置Debezium Connector for Oracle

Debezium 将 Avro 数据视为二进制

使用 Debezium 提取密钥的转换中不存在字段