Debezium的基本使用(以MySQL为例)

Posted 老叶茶馆_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Debezium的基本使用(以MySQL为例)相关的知识,希望对你有一定的参考价值。

  • Debezium介绍

  • 基本使用

    • mysql的准备工作

    • 编写程序

    • 测试

  • 总结


一、Debezium介绍

摘自官网:

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  1. 准备一个MySQL用户并且拥有相应权限,像这样:

CREATE USER 'dbz'@'%' IDENTIFIED BY '******';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY '******';
  1. 检查MySQL是否开启log-bin

SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';

-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是OFF则需要修改MySQL配置文件,类似下面这样:

server-id         = 223344  #必须有
log_bin           = mysql-bin #log_bin的值是binlog文件序列的基本名称
binlog_format     = ROW    #必须是ROW
binlog_row_image  = FULL   #必须是FULL
expire_logs_days  = 10    #依据实际情况而定
  1. 准备数据库&表

create database inventory;
create table inventory.a (id bigint primary key auto_increment, name varchar(32));
insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>$version.debezium</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>$version.debezium</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>$version.debezium</version>
</dependency>

目前Debezium最新稳定版本为:1.9.5.Final

2.2. 准备数据库&表

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

2.3. 代码编写

package com.greatdb.dbzdemo;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

/**
 * @version 1.0
 * @date 2022/07/29
 */
public class DebeziumTest 

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception 
        final Properties props = new Properties();
        props.setProperty("name", "dbz-engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
        props.setProperty("offset.flush.interval.ms", "0");
        //offset config end

        props.setProperty("database.server.name", "mysql-connector");
        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122112"); //需要与MySQL的server-id不同
        props.setProperty("database.hostname", "tmg");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "mysqluser");
        props.setProperty("database.password", "******");
        props.setProperty("database.include.list", "inventory");//要捕获的数据库名
        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> 
                    System.out.println(record);//输出到控制台
                )
                .using((success, message, error) -> 
                    if (error != null) 
                        // 报错回调
                        System.out.println("------------error, message:" + message + "exception:" + error);
                    
                    closeEngine(engine);
                )
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        addShutdownHook(engine);
        awaitTermination(executor);

        System.out.println("------------main finished.");
    

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) 
        try 
            engine.close();
         catch (IOException ignored) 
        
    

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) 
        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
    

    private static void awaitTermination(ExecutorService executor) 
        if (executor != null) 
            try 
                executor.shutdown();
                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) 
                
             catch (InterruptedException e) 
                Thread.currentThread().interrupt();
            
        
    

3. 测试

程序跑起来后,可以看到控制台输出:

...(省略)
EmbeddedEngineChangeEvent [key="schema":"type":"struct","fields":["type":"int64","optional":false,"field":"id"],"optional":false,"name":"mysql_connector.inventory.a.Key","payload":"id":1, value="schema":"type":"struct","fields":["type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before","type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false,incremental","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"sequence","type":"string","optional":true,"field":"table","type":"int64","optional":false,"field":"server_id","type":"string","optional":true,"field":"gtid","type":"string","optional":false,"field":"file","type":"int64","optional":false,"field":"pos","type":"int32","optional":false,"field":"row","type":"int64","optional":true,"field":"thread","type":"string","optional":true,"field":"query"],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source","type":"string","optional":false,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"mysql_connector.inventory.a.Envelope","payload":"before":null,"after":"id":1,"name":"n1","source":"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null,"op":"r","ts_ms":1659064005191,"transaction":null, sourceRecord=SourceRecordsourcePartition=server=mysql-connector, sourceOffset=ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true ConnectRecordtopic='mysql-connector.inventory.a', kafkaPartition=null, key=Structid=1, keySchema=Schemamysql_connector.inventory.a.Key:STRUCT, value=Structafter=Structid=1,name=n1,source=Structversion=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0,op=r,ts_ms=1659064005191, valueSchema=Schemamysql_connector.inventory.a.Envelope:STRUCT, timestamp=null, headers=ConnectHeaders(headers=)]
EmbeddedEngineChangeEvent [key="schema":"type":"struct","fields":["type":"int64","optional":false,"field":"id"],"optional":false,"name":"mysql_connector.inventory.a.Key","payload":"id":2, value="schema":"type":"struct","fields":["type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before","type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false,incremental","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"sequence","type":"string","optional":true,"field":"table","type":"int64","optional":false,"field":"server_id","type":"string","optional":true,"field":"gtid","type":"string","optional":false,"field":"file","type":"int64","optional":false,"field":"pos","type":"int32","optional":false,"field":"row","type":"int64","optional":true,"field":"thread","type":"string","optional":true,"field":"query"],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source","type":"string","optional":false,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"mysql_connector.inventory.a.Envelope","payload":"before":null,"after":"id":2,"name":"n2","source":"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null,"op":"r","ts_ms":1659064005196,"transaction":null, sourceRecord=SourceRecordsourcePartition=server=mysql-connector, sourceOffset=ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true ConnectRecordtopic='mysql-connector.inventory.a', kafkaPartition=null, key=Structid=2, keySchema=Schemamysql_connector.inventory.a.Key:STRUCT, value=Structafter=Structid=2,name=n2,source=Structversion=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0,op=r,ts_ms=1659064005196, valueSchema=Schemamysql_connector.inventory.a.Envelope:STRUCT, timestamp=null, headers=ConnectHeaders(headers=)]
EmbeddedEngineChangeEvent [key="schema":"type":"struct","fields":["type":"int64","optional":false,"field":"id"],"optional":false,"name":"mysql_connector.inventory.a.Key","payload":"id":3, value="schema":"type":"struct","fields":["type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before","type":"struct","fields":["type":"int64","optional":false,"field":"id","type":"string","optional":true,"field":"name"],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false,incremental","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"sequence","type":"string","optional":true,"field":"table","type":"int64","optional":false,"field":"server_id","type":"string","optional":true,"field":"gtid","type":"string","optional":false,"field":"file","type":"int64","optional":false,"field":"pos","type":"int32","optional":false,"field":"row","type":"int64","optional":true,"field":"thread","type":"string","optional":true,"field":"query"],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source","type":"string","optional":false,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"mysql_connector.inventory.a.Envelope","payload":"before":null,"after":"id":3,"name":"n3","source":"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null,"op":"r","ts_ms":1659064005196,"transaction":null, sourceRecord=SourceRecordsourcePartition=server=mysql-connector, sourceOffset=ts_sec=1659064005, file=mysql-bin.000001, pos=154 ConnectRecordtopic='mysql-connector.inventory.a', kafkaPartition=null, key=Structid=3, keySchema=Schemamysql_connector.inventory.a.Key:STRUCT, value=Structafter=Structid=3,name=n3,source=Structversion=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0,op=r,ts_ms=1659064005196, valueSchema=Schemamysql_connector.inventory.a.Envelope:STRUCT, timestamp=null, headers=ConnectHeaders(headers=)]
...(省略)

可以看到全量的数据已经输出,关键的数据如下:

..."payload":"before":null,"after":"id":1,"name":"n1"..."op":"r"...
..."payload":"before":null,"after":"id":2,"name":"n2"..."op":"r"...
..."payload":"before":null,"after":"id":3,"name":"n3"..."op":"r"...
  • 接下来新增一条数据:

insert into inventory.a values (4, 'n4');

控制台输出:

..."payload":"before":null,"after":"id":4,"name":"n4"..."op":"c"...
  • 修改一条数据:

update inventory.a set name = 'n4-upd' where id = 4;

控制台输出:

..."payload":"before":"id":4,"name":"n4","after":"id":4,"name":"n4-upd"..."op":"u"...
  • 删除一条数据:

delete from inventory.a where id = 1;

控制台输出:

..."payload":"before":"id":1,"name":"n1","after":null..."op":"d"...

三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

参考:https://debezium.io/documentation/reference/1.8/index.html

Enjoy GreatSQL :)


《深入浅出MGR》视频课程

戳此小程序即可直达B站

https://www.bilibili.com/medialist/play/1363850082?business=space_collection&business_id=343928&desc=0



文章推荐:


想看更多技术好文,点个“在看”吧!

以上是关于Debezium的基本使用(以MySQL为例)的主要内容,如果未能解决你的问题,请参考以下文章

在 Debezium 中加入:MySQL 到 Elasticsearch

debezium - 模式注册表问题

使用JDBC一次插入多条记录(以MySQL为例)

如何以 JSON 格式转换 debezium 消息,以便可以将其加载到 Redshift

MySQL使用Debezium更改为Kafka-仅捕获DDL stmts

Debezium - 自定义负载 - MySQL 连接器