canal

Posted 狂乱的贵公子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal相关的知识,希望对你有一定的参考价值。

1.  canal 简介

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

canal 工作原理:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

官网地址:https://github.com/alibaba/canal

2.  canal 服务端

 (1)修改 /etc/my.cnf 文件,增加以下配置(PS:改完后别忘了重启数据库 systemctl restart mysqld )

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

备忘录

show variables like \'log_bin\';
show binary logs;
show master status;
select * from mysql.`user`;

(2)授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY \'Canal@123456\';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'canal\'@\'%\';
-- GRANT ALL PRIVILEGES ON *.* TO \'canal\'@\'%\' ;
FLUSH PRIVILEGES;

 (3)下载canal,并解压缩

https://github.com/alibaba/canal/releases

(4)修改配置

vim conf/example/instance.properties

启动

sh bin/startup.sh

查看日志

# 查看 server 日志
vi logs/canal/canal.log

# 查看 instance 的日志
vi logs/example/example.log

# 关闭
sh bin/stop.sh

3. canal 客户端

依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
</dependency>

配置

canal.server.host=192.168.28.32
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal

spring.kafka.bootstrap-servers=192.168.28.32:9092

logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds" debug="false">
    <property name="log.charset" value="utf-8" />
    <property name="log.pattern" value="%dyyyy-MM-dd HH:mm:ss [%thread] %-5level %logger36 - %msg%n" />
    <property name="log.dir" value="./logs" />

    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>$log.pattern</pattern>
            <charset>$log.charset</charset>
        </encoder>
    </appender>
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>$log.dir/my-canal-client.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>$log.dir/my-canal-client.%dyyyy-MM-dd.log</fileNamePattern>
            <maxHistory>30</maxHistory>
            <totalSizeCap>3GB</totalSizeCap>
        </rollingPolicy>
        <encoder>
            <pattern>$log.pattern</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="console" />
        <appender-ref ref="file" />
    </root>
</configuration>

测试代码

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;

import java.util.List;

/**
 * @Author ChengJianSheng
 * @Date 2022/1/13
 */
@Slf4j
public class Demo 

    private volatile boolean running = false;
    private CanalConnector connector;
    private Thread thread = null;

    public Demo(CanalConnector connector) 
        this.connector = connector;
    

    public void start() 
        Assert.notNull(connector, "connector is null");
        thread = new Thread(this::process);
        running = true;
        thread.start();
    

    public void stop() 
        if (!running) 
            return;
        
        running = false;
        if (thread != null) 
            try 
                thread.join();
             catch (InterruptedException e) 
                // ignore
            
        
    

    private void process() 
        int batchSize = 5 * 1024;
        while (running) 
            try 
                connector.connect();
                connector.subscribe(".*\\\\..*");
                while (running) 
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) 
                        // try 
                        // Thread.sleep(1000);
                        //  catch (InterruptedException e) 
                        // 
                     else 
                        printEntry(message.getEntries());
                    

                    if (batchId != -1) 
                        connector.ack(batchId); // 提交确认
                    
                
             catch (Throwable e) 
                log.error("process error!", e);
                try 
                    Thread.sleep(1000L);
                 catch (InterruptedException e1) 
                    // ignore
                

                connector.rollback(); // 处理失败, 回滚数据
             finally 
                connector.disconnect();
            
        
    

    private void printEntry(List<CanalEntry.Entry> entrys) 
        for (CanalEntry.Entry entry : entrys) 
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) 
                continue;
            

            CanalEntry.RowChange rowChage = null;
            try 
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
             catch (Exception e) 
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) 
                if (eventType == CanalEntry.EventType.DELETE) 
                    printColumn(rowData.getBeforeColumnsList());
                 else if (eventType == CanalEntry.EventType.INSERT) 
                    printColumn(rowData.getAfterColumnsList());
                 else 
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                
            
        
    

    private void printColumn(List<CanalEntry.Column> columns) 
        for (CanalEntry.Column column : columns) 
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        
    

启动

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

/**
 * @Author ChengJianSheng
 * @Date 2022/1/4
 */
@Slf4j
@Component
public class MyCommandLineRunner implements CommandLineRunner 

    @Autowired
    private CanalServerProperties canalServerProperties;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public void run(String... args) throws Exception 
        String destination = "example";
//        String ip = AddressUtils.getHostIp();
//        String ip = canalServerProperties.getHost();
        String ip = "192.168.28.32";
        SocketAddress address = new InetSocketAddress(ip, 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "canal", "canal");

        Demo demo = new Demo(connector);
        demo.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> 
            try 
                log.info("## stop the canal client");
                demo.stop();
             catch (Throwable e) 
                log.warn("##something goes wrong when stopping canal:", e);
             finally 
                log.info("## canal client is down.");
            
        ));
    

 

以上是关于canal的主要内容,如果未能解决你的问题,请参考以下文章

springboot 整合 canal

canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步

mysql进阶:canal实现mysql数据同步到redis|实现自定义canal客户端

大数据Spark Streaming实时处理Canal同步binlog数据