Canal同步数据

Posted wzq_55552

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Canal同步数据相关的知识,希望对你有一定的参考价值。

canal同步数据

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

canal工作原理

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

加个canal slave 监听mysql数据变化日志,只适用mysql数据库。读取的数据可以同步到redis、其他数据库、ES等。

canal需要使用到mysql,我们需要先安装mysql, 安装mysql容器,但canal是基于mysql的主从模式实现的,所以必须先开启binlog.

开启binlog模式

binlog可以拿到mysql数据日志,canal再去获取日志信息。

先使用docker 创建mysql容器。

(1) 连接到mysql中,并修改/etc/mysql/mysql.conf.d/mysqld.cnf 需要开启主从模式,开启binlog模式。

执行如下命令,编辑mysql配置文件

命令行如下:

docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf

修改mysqld.cnf配置文件,添加如下配置:

上图配置如下:

二进制模式的日志目录

数据库唯一id

log-bin/var/lib/mysql/mysql-bin
server-id=12345

(2) 创建账号,用于测试使用

使用root账号创建用户并授予权限。账号canal,密码canal

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

(3)重启mysql容器

docker restart mysql

canal容器安装

下载镜像:

docker pull docker.io/canal/canal-server

容器安装

-p端口映射,-d后台运行

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。

执行代码如下:

docker exec -it canal /bin/bash
cd canal-server/conf/
vi canal.properties

cd example/
vi instance.properties

修改canal.properties的id,不能和mysql的server-id重复,如下图:

修改instance.properties,配置数据库连接地址:

改需要监听的数据库ip地址

配置里面有username和password,都是canal

这里的canal.instance.filter.regex有多种配置,如下:

数据库正则表达式

.*表示所有数据库

\\\\..*表示所有表

.*\\\\..*表示所有数据库的所有表都被监听

可以参考地址如下:

https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\\\) 
常见例子:
1.  所有表:.*   or  .*\\\\..*
2.  canal schema下所有表: canal\\\\..*
3.  canal下的以canal打头的表:canal\\\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

配置完成后,设置开机启动,并记得重启canal。

exit
docker update --restart=always canal
docker restart canal

canal微服务搭建

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行同步到redis(也可以是另一个数据库、ES)中即可。

思路:创建一个独立的程序,并监控canal服务器,获取binlog日志,解析数据,将数据更新到redis中。这样数据就更新了。

github地址:https://github.com/wanwujiedao/spring-boot-starter-canal、

https://github.com/alibaba/canal

(1)安装辅助jar包

spring-boot-starter-canal-master中有一个工程starter-canal,它主要提供了SpringBoot环境下canal的支持,我们需要先安装该工程,在starter-canal目录下执行mvn install下载jar到本地,如下图:

(2)canal微服务工程搭建

创建service-canal工程,并引入相关配置。

pom.xml

        <!--canal依赖-->
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

application.yml配置

server:
  port: 18082
spring:
  application:
    name: canal
#example实例,对应canal配置文件
canal:
  client:
    instances:
      example:
        host: 192.168.169.140
        port: 11111
        userName: canal
        password: canal
#springCloud
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE

(3)启动类创建

在包下创建启动类,代码如下:

@SpringBootApplication(exclude=DataSourceAutoConfiguration.class) // 忽略数据库连接
@EnableEurekaClient // springCloud注册中心
@EnableCanalClient  // canal客户端
public class CanalApplication 

    public static void main(String[] args) 
        SpringApplication.run(CanalApplication.class,args);
    

(4)监听创建

创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:

package com.changgou.service.canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.DeleteListenPoint;
import com.xpand.starter.canal.annotation.InsertListenPoint;
import com.xpand.starter.canal.annotation.ListenPoint;
import com.xpand.starter.canal.annotation.UpdateListenPoint;


/**
 * Title:实现对mysql数据库数据日志的监听
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2020/3/4
 */
@CanalEventListener
public class CanalDataEventListener 

    /***
     * 增加数据监听
     * @param eventType 当前操作的类型 增加数据
     * @param rowData 发生变更的数据-->>增加的数据
     */
    @InsertListenPoint
    public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        System.out.println("增加数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    

    // rowData.getAfterColumnsList() 之后的数据,适用于增加、修改
    // rowData.getBeforeColumnsList() 之前的数据,适用于删除

    /**
     * 修改数据监听
     * @param eventType
     * @param rowData 发生变更的数据-->>修改的数据
     */
    @UpdateListenPoint
    public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        System.out.println("修改前的数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
        System.out.println("修改后的数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    

    /***
     * 删除数据监听
     * @param eventType
     */
    @DeleteListenPoint
    public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        System.out.println("删除数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    

    /***
     * 自定义数据修改监听,指定监听的库,表
     * @param eventType
     * @param rowData
     */
    @ListenPoint(destination = "example", // 实例配置
            schema = "changgou_content", // 库
            table = "tb_content_category", "tb_content", // 表
            eventType = CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE) // 监听类型,修改数据,删除数据
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        System.out.println("自定义修改前的数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
        System.out.println("自定义修改后的数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    

(5)测试

启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出如下:

同步redis缓存例子

监听修改的数据并同步到redis缓存(Mysql、ES)。一般缓存的数据是静态数据,防止高并发。

广告图片缓存同步:

如上图,每次执行广告操作的时候,会记录操作日志到,然后将操作日志发送给canal,canal将操作记录发送给canal微服务,canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,canal微服务再将所有广告存入到Redis缓存。

content微服务

service-content微服务是广告微服务的增删改查,这里不用写出来。canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,这里要使用到fegin,微服务之间的调用。

 /***
     * 根据categoryId查询广告集合
     */
    @GetMapping(value = "/list/category/id")
    public ResponseResult<List<Content>> findByCategory(@PathVariable Long id)
        //根据分类ID查询广告集合
        List<Content> contents = contentService.findByCategory(id);
        return new ResponseResult<List<Content>>(true,StatusCode.OK,"查询成功!",contents);
    
#springCloud配置
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
    
feign:
  hystrix:
    enabled: true
    
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE
<!-- Spring Cloud -->
<spring-cloud.version>Greenwich.SR2</spring-cloud.version>

<!-- Spring Cloud -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>$spring-cloud.version</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<!-- eureka注册中心,只有eureka-server微服务用到 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

<!-- eureka-client客户端 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<!-- openfeign -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<!-- redis 使用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

另外在另一个工程中加入fegin,方法和地址跟微服务controller一样,service-content-fegin中直接加,不用配置文件,不用启动类,放个接口就行:

@FeignClient(name="content")
@RequestMapping(value = "/content")
public interface ContentFeign 

    /***
     * 根据分类ID查询所有广告
     */
    @GetMapping(value = "/list/category/id")
    ResponseResult<List<Content>> findByCategory(@PathVariable Long id);

同步实现

在canal微服务中修改如下:

(1)配置redis

修改application.yml配置文件,添加redis配置,如下代码:

redis有设置密码则添加password

(2)启动类中开启feign

修改CanalApplication,添加@EnableFeignClients注解,扫描fegin包,可调用content微服务controller方法。代码如下:

(3)同步实现

修改监听类CanalDataEventListener,实现监听广告的增删改,并根据增删改的数据使用feign查询对应分类的所有广告,将广告存入到Redis中,代码如下:

上图代码如下:

/**
 * Title:实现对mysql数据库数据日志的监听
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2020/3/4
 */
@CanalEventListener
public class CanalDataEventListener 

    @Resource
    private ContentFeign contentFeign;

    //字符串
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //自定义数据库的 操作来监听
    //destination = "example"

    /**
     * 自定义数据库的 操作来监听
     * @param eventType 数据库修改数据类型,增改删
     * @param rowData 数据
     */
    @ListenPoint(destination = "example",
            schema = "changgou_content",
            table = "tb_content", "tb_content_category",
            eventType = 
                    CanalEntry.EventType.UPDATE,
                    CanalEntry.EventType.DELETE,
                    CanalEntry.EventType.INSERT)
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        //1.获取列名 为category_id的值
        String categoryId = getColumnValue(eventType, rowData);
        //2.调用feign 获取该分类下的所有的广告集合
        ResponseResult<List<Content>> categoryresut = contentFeign.findByCategory(Long.valueOf(categoryId));
        List<Content> data = categoryresut.getData();
        //3.使用redisTemplate存储到redis中,存json值
        stringRedisTemplate.boundValueOps("content_" + categoryId).set(JSON.toJSONString(data));
    

    private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData) 
        String categoryId = "";
        //判断 如果是删除  则获取beforlist
        if (eventType == CanalEntry.EventType.DELETE) 
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) 
                // 列名为category_id
                if (column.getName().equalsIgnoreCase("category_id")) 
                    categoryId = column.getValue();
                    return categoryId;
                
            
         else 
            //判断 如果是添加 或者是更新 获取afterlist
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) 
                if (column.getName().equalsIgnoreCase("category_id")) 
                    categoryId = column.getValue();
                    return categoryId;
                
            
        
        return categoryId;
    


测试:

修改数据库数据,可以看到Redis中的缓存跟着一起变化

阿里的数据同步神器——Canal

Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

原理:

  1. canal将自己伪装为MySQL的slave,向master发送dump协议
  2. master收到dump协议,数据发生修改后推送binary log给canal
  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

Canal 安装

  1. MySQL配置

    注:本案例的mysql在windows上,linux环境的配置没有太大区别

    首先要让mysql开启binlog模式

    1) 进入mysql查看是否启动binlog

    SHOW VARIABLES LIKE '%log_bin%'
    

    log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

    windows配置文件是MySQL安装目录的my.ini

    linux在/etc/my.cnf

    修改:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    

    2) 创建用户

    进入mysql,创建canal用户并授权

    create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  2. 下载和安装canal

    到官网下载 https://github.com/alibaba/canal/releases

    这里使用的是1.1.4版本

    上传文件到Linux,解压到canal目录中

    cd /usr/local
    mkdir canal
    tar -vxf canal.deployer-1.1.4.tar.gz -C canal
    
  3. 配置Canal

    进入mysql,输入命令,记录文件名和位置

    show master status;
    

    进入canal目录,修改配置文件

    vi conf/example/instance.properties
    

  4. 启动Canal

    进入bin目录启动服务

    ./startup.sh
    

    关闭服务使用 stop.sh

    查看启动日志文件

    cat /usr/local/canal/logs/canal/canal.log
    cat /usr/local/canal/logs/example/example.log
    

    以上效果表示已经运行,如果出现异常可以按日志情况解决

    主要问题总结:

    1. 异常信息 authentication error,数据库账号和密码配置错误
    2. 异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
    3. 客户端版本兼容问题,canal的版本和客户端的版本要一致

Canal 客户端

官方客户端

1) 引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2) Java代码

package com.blb.canal_demo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 客户端测试
 */
public class ClientTest 
    public static void main(String args[]) 
        // 创建canal连接对象
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",
                11111), "example", "canal", "canal");
        try 
            //连接
            connector.connect();
            //订阅所有数据库和表
            connector.subscribe(".*\\\\..*");
            connector.rollback();
            while (true) 
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) 
                    //没有数据,就休眠1秒
                    try 
                        Thread.sleep(1000);
                     catch (InterruptedException e) 
                    
                 else 
                    //有数据就打印
                    printEntry(message.getEntries());
                
                // 提交确认
                connector.ack(batchId);
            
         finally 
            connector.disconnect();
        
    

    private static void printEntry(List<Entry> entrys) 
        for (Entry entry : entrys) 
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == EntryType.TRANSACTIONEND) 
                continue;
            
            RowChange rowChage = null;
            try 
                rowChage = RowChange.parseFrom(entry.getStoreValue());
             catch (Exception e) 
                throw new RuntimeException("ERROR parse data:" + entry.toString(),e);
            
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) 
                //判断增删改操作
                if (eventType == EventType.DELETE) 
                    printColumn(rowData.getBeforeColumnsList());
                 else if (eventType == EventType.INSERT) 
                    printColumn(rowData.getAfterColumnsList());
                 else 
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                
            
        
    

    private static void printColumn(List<Column> columns) 
        for (Column column : columns) 
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        
    

修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改

第三方客户端

官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单

https://github.com/chenqian56131/spring-boot-starter-canal

1) 引入依赖

首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖

<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

2) 启动类添加注解

@EnableCanalClient

3)配置文件

canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

4) 监听器

package com.blb.canal_demo;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;

/**
 * 事件监听器
 */
@CanalEventListener
public class CanalListener 

    /**
     * 监听 erp数据库的customer表
     */
    @ListenPoint(schema = "erp",table = "customer")
    public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData)
        System.out.println("修改前");
        //打印改变之前的数据
        rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\\t"));
        System.out.println("\\n修改后");
        //打印改变之后的数据
        rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\\t"));
    

Canal+RabbitMQ实现数据增量同步

实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程

步骤:

  1. 课程微服务对MySQL中的课程数据库课程表进行增删改操作,MySQL发送binlog给Canal
  2. 数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务
  3. 搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新

课程表的增删改这里就不介绍了,主要看看同步服务的核心代码

  1. 依赖
<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
  1. 配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
  1. MQ配置
/**
 * RabbitMQ的配置
 */
@Slf4j
@Configuration
public class RabbitMQConfig 

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() 
        return new Queue(QUEUE_COURSE_SAVE);
    

    @Bean
    public Queue queueCourseRemove() 
        return new Queue(QUEUE_COURSE_REMOVE);
    

    @Bean
    public TopicExchange topicExchange() 
        return new TopicExchange(COURSE_EXCHANGE);
    

    @Bean
    public Binding bindCourseSave() 
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    

    @Bean
    public Binding bindCourseRemove() 
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    

  1. Canal监听器
/**
 * 课程表数据同步监听器
 */
@Slf4j
@CanalEventListener
public class CourseSyncListener 

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 监听课程表的修改
     */
    @ListenPoint(schema = "edu_course",table = "course")
    public void handleCourseChange(EventType eventType, RowData rowData)
        log.info("course表操作:",eventType);
        if(eventType == EventType.INSERT || eventType == EventType.UPDATE)
            //获得修改后的数据
            Map<String,String> map = new HashMap<>();
            rowData.getAfterColumnsList().forEach(c -> 
                map.put(c.getName(),c.getValue());
            );
            String json = JSON.toJSONString(map);
            log.info("保存数据:",json);
            //发送给mq,通知搜索服务进行添加
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                RabbitMQConfig.KEY_COURSE_SAVE, json));
        else if(eventType == EventType.DELETE)
            //获得删除前的id
            Long[] id = new Long[1];
            rowData.getBeforeColumnsList().forEach(c -> 
                if("id".equals(c.getName()))
                    id[0] = Long.valueOf(c.getValue());
                
            );
            log.info("删除数据:",id[0]);
            //发送给mq,通知搜索服务进行删除
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                    RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));
        else
            log.info("不支持其它操作");
        
    

搜索服务的消息监听

@Slf4j
@Component
public class CourseMQListener 

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "course.exchange";
    
    @Autowired
    ICourseService courseService;

    /**
     * 监听课程添加和更新操作
     */
    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE))
    public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException 
        log.info("保存课程课程:",json);
        //将消息转为课程,保存到es中
        Course course = JSON.parseObject(json,Course.class);
        //保存课程到ElasticSearch中
        courseService.saveOrUpdate(course);
    

    /**
     * 监听课程删除操作
     */
    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_REMOVE))
    public void receiveCourseDeleteMessage(Long id) 
        courseService.removeById(id);
        log.info("课程删除完成:",id);
    


以上是关于Canal同步数据的主要内容,如果未能解决你的问题,请参考以下文章

canal+kafka+go处理Mysql数据库增量信息

Canal

大数据Canal:使用Canal同步MySQL数据

Canal增量数据同步利器介绍与安装

Canal同步数据

数据库增量日志监听canal