Debezium 没有为 mysql 提供嵌入式版本的 CDC

Posted

技术标签:

【中文标题】Debezium 没有为 mysql 提供嵌入式版本的 CDC【英文标题】:Debezium is not giving the CDC for Embedded version for mysql 【发布时间】:2019-04-30 04:18:15 【问题描述】:

我正在使用以下依赖项,

        <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-oracle</artifactId>
        <version>$version.debezium</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>$version.debezium</version>
    </dependency>

<version.debezium>0.8.3.Final</version.debezium>

下面是我的java方法,

public void runMysqlParsser() 

    Configuration config = Configuration.create()
            /* begin engine properties */
            .with("connector.class",
                    "io.debezium.connector.mysql.MySqlConnector")
            .with("offset.storage",
                    "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename",
                    "/home/mohit/tmp/offset.dat")
            .with("offset.flush.interval.ms", 60000)
            /* begin connector properties */
            .with("name", "my-sql-connector")
            .with("database.hostname", "localhost")
            .with("database.port", 3306)
            .with("database.user", "root")
            .with("database.password", "root")
            .with("server.id", 1)
            .with("database.server.name", "my-app-connector")
            .with("database.history",
                    "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename",
                    "/home/mohit/tmp/dbhistory.dat")
            .with("database.whitelist", "mysql")
            .with("table.whitelist", "mysql.customers")
            .build();
    EmbeddedEngine engine = EmbeddedEngine.create()
            .using(config)
            .notifying(this::handleEvent)
            .build();
    Executor executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);


    private void handleEvent(SourceRecord sourceRecord) 
    try 
        LOG.info("Got record :" + sourceRecord.toString());
     catch (Exception ex) 
        LOG.info("exception in handle event:" + ex);
    

我的 sql 配置, .

general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id               = 1
log_bin                 = /var/log/mysql/mysql-bin.log
expire_logs_days        = 10
max_binlog_size   = 100M
binlog_format     = row
binlog_row_image  = full
binlog_rows_query_log_events = on
gtid_mode        =  on
enforce_gtid_consistency   = on

当我运行这段代码时,我得到了历史日志的偏移量,mysql.log 文件也得到了添加到它的偏移量。但是,当我对表执行任何更新语句时,它没有给我任何日志,即没有调用 handleEvent 方法。谁能告诉我代码或配置有什么问题?

下面是运行java代码后的日志,

$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar 

log4j:WARN 找不到记录器 (org.apache.kafka.connect.json.JsonConverterConfig) 的附加程序。 log4j:WARN 请正确初始化log4j系统。

log4j:WARN 请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 了解更多信息。 2018 年 11 月 28 日下午 1:29:47 com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent 信息:得到记录:SourceRecordsourcePartition=server=my-app-connector,sourceOffset=file=mysql-bin.000002,pos=980,gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, snapshot=true ConnectRecordtopic='my-app-connector', kafkaPartition=0, key=StructdatabaseName=, value=Structsource=Structversion=0.8.3.Final,name=my-app -connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true,databaseName=,ddl=SET character_set_server=latin1, collat​​ion_server=latin1_swedish_ci;, timestamp=空,标头=连接标头(标头=) 2018 年 11 月 28 日下午 1:29:47 com.github.shyiko.mysql.binlog.BinaryLogClient 连接 信息:在 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 (sid:6326, cid:21) 连接到 localhost:3306

【问题讨论】:

【参考方案1】:

您是否将正确的数据库/表列入白名单?

你能看看这个演示吗 - https://github.com/debezium/debezium-examples/tree/master/kinesis 只需删除 Kinesis 相关代码并仅打印到控制台即可。 还要检查table.ignore.builtin 配置选项。恕我直言mysql 数据库属于内置数据库,默认被过滤掉。

【讨论】:

是的@Jiri Pechanec。该表是 mysql.customers 。

以上是关于Debezium 没有为 mysql 提供嵌入式版本的 CDC的主要内容,如果未能解决你的问题,请参考以下文章

无法为 SqlServer 连接器设置 debezium 嵌入式引擎

Debezium 时间戳问题,无法转换为本地时区

MySQL使用Debezium更改为Kafka-仅捕获DDL stmts

如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?

Debezium MySQL错误:连接密码为空

Debezium - MySQL 连接器 - Kinesis - 服务未启动