canal 快速开始

Posted xxzblog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 快速开始相关的知识,希望对你有一定的参考价值。

canal 快速开始

服务端

服务端ip:192.168.1.101

  1. 下载canal 到/opt/softwares

    wget https://github.com/alibaba/canal/releases/download/canal-1.0.26-preview-2/canal.deployer-1.0.26-SNAPSHOT.tar.gz
  2. 解压缩

    mkdir /opt/canal && tar -zxvf canal.deployer-1.0.26-SNAPSHOT.tar.gz -C /opt/canal/
  3. 配置mysql

    vim /etc/my.cnf
    
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
  4. 创建用户

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
  5. 创建数据库

    CREATE DATABASE IF NOT EXISTS canal_tsdb default charset utf8 COLLATE utf8_general_ci;
  6. 配置canal

    cp /opt/canal/conf/example/instance.properties /opt/canal/conf/example/instance.properties.bak
    vim /opt/canal/conf/example/instance.properties
    
    
    #################################################
    
    
    ## mysql serverId
    
    canal.instance.mysql.slaveId=0
    
    # position info
    
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=mysql-bin.000001
    canal.instance.master.position=12172
    canal.instance.master.timestamp=
    
    
    
    # table meta tsdb info
    
    canal.instance.tsdb.enable=true
    
    #canal.instance.tsdb.dir=$canal.file.data.dir:../conf/$canal.instance.destination:
    
    
    #canal.instance.tsdb.url=jdbc:h2:$canal.instance.tsdb.dir/h2;CACHE_SIZE=1000;MODE=MYSQL;
    
    canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    canal.instance.tsdb.dbUsername=canal
    canal.instance.tsdb.dbPassword=canal
    
    
    
    #canal.instance.standby.address =
    
    
    #canal.instance.standby.journal.name =
    
    
    #canal.instance.standby.position = 
    
    
    #canal.instance.standby.timestamp = 
    
    
    # username/password
    
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.defaultDatabaseName=test
    canal.instance.connectionCharset=UTF-8
    
    # table regex
    
    canal.instance.filter.regex=.*\\\\..*
    
    # table black regex
    
    canal.instance.filter.black.regex=
    
    #################################################
    

    修改canal.properties

    vim /opt/canal/conf/canal.properties
    
    
    #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
    
    canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
    
    
    # 找到tsdb的配置,注释上面的配置,启用下面的配置
    

  7. 启动canal

    sh /opt/canal/bin/startup.sh

客户端

  1. 引入依赖

    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.23</version>
    </dependency>
  2. 客户端启动类

    package canal.test;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    /**
    * @author tt
    */
    public class SimpleCanalClientExample 
    
       public static void main(String[] args) 
           // 创建链接
           CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.101",
                   11111), "example", "", "");
           int batchSize = 1000;
           int emptyCount = 0;
           try 
               connector.connect();
               connector.subscribe(".*\\\\..*");
               connector.rollback();
               int totalEmptyCount = 120;
               boolean flag = true;
               while (flag || emptyCount < totalEmptyCount) 
                   Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                   long batchId = message.getId();
                   int size = message.getEntries().size();
                   if (batchId == -1 || size == 0) 
                       emptyCount++;
                       System.out.println("empty count : " + emptyCount);
                       try 
                           Thread.sleep(1000);
                        catch (InterruptedException e) 
                       
                    else 
                       emptyCount = 0;
                       // System.out.printf("message[batchId=%s,size=%s] \\n", batchId, size);
                       printEntry(message.getEntries());
                   
    
                   connector.ack(batchId); // 提交确认
                   // connector.rollback(batchId); // 处理失败, 回滚数据
               
    
               System.out.println("empty too many times, exit");
            finally 
               connector.disconnect();
           
       
    
       private static void printEntry(List<CanalEntry.Entry> entrys) 
           for (CanalEntry.Entry entry : entrys) 
               if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) 
                   continue;
               
    
               CanalEntry.RowChange rowChage = null;
               try 
                   rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                catch (Exception e) 
                   throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                           e);
               
    
               CanalEntry.EventType eventType = rowChage.getEventType();
               System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                       entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                       entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                       eventType));
    
               for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) 
                   if (eventType == CanalEntry.EventType.DELETE) 
                       printColumn(rowData.getBeforeColumnsList());
                    else if (eventType == CanalEntry.EventType.INSERT) 
                       printColumn(rowData.getAfterColumnsList());
                    else 
                       System.out.println("-------> before");
                       printColumn(rowData.getBeforeColumnsList());
                       System.out.println("-------> after");
                       printColumn(rowData.getAfterColumnsList());
                   
               
           
       
    
       private static void printColumn(List<CanalEntry.Column> columns) 
           for (CanalEntry.Column column : columns) 
               System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
           
       
    
    

快速开始到这里就结束了。

以上是关于canal 快速开始的主要内容,如果未能解决你的问题,请参考以下文章

canal 源码解析系列-工程结构说明

秒杀系统番外篇 | 阿里开源MySQL中间件Canal快速入门

otter快速开始,实时数据备份

springboot 整合 canal

Canal

canal指定binlog文件和position开始读取