阿里的数据同步神器——Canal

Posted 恒哥~Bingo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阿里的数据同步神器——Canal相关的知识,希望对你有一定的参考价值。

Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

原理:

  1. canal将自己伪装为mysql的slave,向master发送dump协议
  2. master收到dump协议,数据发生修改后推送binary log给canal
  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

Canal 安装

  1. MySQL配置

    注:本案例的mysql在windows上,linux环境的配置没有太大区别

    首先要让mysql开启binlog模式

    1) 进入mysql查看是否启动binlog

    SHOW VARIABLES LIKE '%log_bin%'
    

    log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

    windows配置文件是MySQL安装目录的my.ini

    linux在/etc/my.cnf

    修改:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    

    2) 创建用户

    进入mysql,创建canal用户并授权

    create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  2. 下载和安装canal

    到官网下载 https://github.com/alibaba/canal/releases

    这里使用的是1.1.4版本

    上传文件到Linux,解压到canal目录中

    cd /usr/local
    mkdir canal
    tar -vxf canal.deployer-1.1.4.tar.gz -C canal
    
  3. 配置Canal

    进入mysql,输入命令,记录文件名和位置

    show master status;
    

    进入canal目录,修改配置文件

    vi conf/example/instance.properties
    

  4. 启动Canal

    进入bin目录启动服务

    ./startup.sh
    

    关闭服务使用 stop.sh

    查看启动日志文件

    cat /usr/local/canal/logs/canal/canal.log
    cat /usr/local/canal/logs/example/example.log
    

    以上效果表示已经运行,如果出现异常可以按日志情况解决

    主要问题总结:

    1. 异常信息 authentication error,数据库账号和密码配置错误
    2. 异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
    3. 客户端版本兼容问题,canal的版本和客户端的版本要一致

Canal 客户端

官方客户端

1) 引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2) Java代码

package com.blb.canal_demo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 客户端测试
 */
public class ClientTest 
    public static void main(String args[]) 
        // 创建canal连接对象
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",
                11111), "example", "canal", "canal");
        try 
            //连接
            connector.connect();
            //订阅所有数据库和表
            connector.subscribe(".*\\\\..*");
            connector.rollback();
            while (true) 
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) 
                    //没有数据,就休眠1秒
                    try 
                        Thread.sleep(1000);
                     catch (InterruptedException e) 
                    
                 else 
                    //有数据就打印
                    printEntry(message.getEntries());
                
                // 提交确认
                connector.ack(batchId);
            
         finally 
            connector.disconnect();
        
    

    private static void printEntry(List<Entry> entrys) 
        for (Entry entry : entrys) 
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == EntryType.TRANSACTIONEND) 
                continue;
            
            RowChange rowChage = null;
            try 
                rowChage = RowChange.parseFrom(entry.getStoreValue());
             catch (Exception e) 
                throw new RuntimeException("ERROR parse data:" + entry.toString(),e);
            
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) 
                //判断增删改操作
                if (eventType == EventType.DELETE) 
                    printColumn(rowData.getBeforeColumnsList());
                 else if (eventType == EventType.INSERT) 
                    printColumn(rowData.getAfterColumnsList());
                 else 
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                
            
        
    

    private static void printColumn(List<Column> columns) 
        for (Column column : columns) 
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        
    

修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改

第三方客户端

官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单

https://github.com/chenqian56131/spring-boot-starter-canal

1) 引入依赖

首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖

<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

2) 启动类添加注解

@EnableCanalClient

3)配置文件

canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

4) 监听器

package com.blb.canal_demo;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;

/**
 * 事件监听器
 */
@CanalEventListener
public class CanalListener 

    /**
     * 监听 erp数据库的customer表
     */
    @ListenPoint(schema = "erp",table = "customer")
    public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData)
        System.out.println("修改前");
        //打印改变之前的数据
        rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\\t"));
        System.out.println("\\n修改后");
        //打印改变之后的数据
        rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\\t"));
    

Canal+RabbitMQ实现数据增量同步

实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程

步骤:

  1. 课程微服务对MySQL中的课程数据库课程表进行增删改操作,MySQL发送binlog给Canal
  2. 数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务
  3. 搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新

课程表的增删改这里就不介绍了,主要看看同步服务的核心代码

  1. 依赖
<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
  1. 配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
  1. MQ配置
/**
 * RabbitMQ的配置
 */
@Slf4j
@Configuration
public class RabbitMQConfig 

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() 
        return new Queue(QUEUE_COURSE_SAVE);
    

    @Bean
    public Queue queueCourseRemove() 
        return new Queue(QUEUE_COURSE_REMOVE);
    

    @Bean
    public TopicExchange topicExchange() 
        return new TopicExchange(COURSE_EXCHANGE);
    

    @Bean
    public Binding bindCourseSave() 
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    

    @Bean
    public Binding bindCourseRemove() 
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    

  1. Canal监听器
/**
 * 课程表数据同步监听器
 */
@Slf4j
@CanalEventListener
public class CourseSyncListener 

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 监听课程表的修改
     */
    @ListenPoint(schema = "edu_course",table = "course")
    public void handleCourseChange(EventType eventType, RowData rowData)
        log.info("course表操作:",eventType);
        if(eventType == EventType.INSERT || eventType == EventType.UPDATE)
            //获得修改后的数据
            Map<String,String> map = new HashMap<>();
            rowData.getAfterColumnsList().forEach(c -> 
                map.put(c.getName(),c.getValue());
            );
            String json = JSON.toJSONString(map);
            log.info("保存数据:",json);
            //发送给mq,通知搜索服务进行添加
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                RabbitMQConfig.KEY_COURSE_SAVE, json));
        else if(eventType == EventType.DELETE)
            //获得删除前的id
            Long[] id = new Long[1];
            rowData.getBeforeColumnsList().forEach(c -> 
                if("id".equals(c.getName()))
                    id[0] = Long.valueOf(c.getValue());
                
            );
            log.info("删除数据:",id[0]);
            //发送给mq,通知搜索服务进行删除
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                    RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));
        else
            log.info("不支持其它操作");
        
    

搜索服务的消息监听

@Slf4j
@Component
public class CourseMQListener 

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "course.exchange";
    
    @Autowired
    ICourseService courseService;

    /**
     * 监听课程添加和更新操作
     */
    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE))
    public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException 
        log.info("保存课程课程:",json);
        //将消息转为课程,保存到es中
        Course course = JSON.parseObject(json,Course.class);
        //保存课程到ElasticSearch中
        courseService.saveOrUpdate(course);
    

    /**
     * 监听课程删除操作
     */
    @RabbitListener(bindings = 
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_REMOVE))
    public void receiveCourseDeleteMessage(Long id) 
        courseService.removeById(id);
        log.info("课程删除完成:",id);
    


以上是关于阿里的数据同步神器——Canal的主要内容,如果未能解决你的问题,请参考以下文章

同步MySQL数据到ES神器mysqlmom介绍

阿里Canal框架(数据同步中间件)初步实践

Canal同步数据

Canal同步数据库实现

实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

阿里开源项目——Canal