canal 快速开始
Posted xxzblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 快速开始相关的知识,希望对你有一定的参考价值。
canal 快速开始
服务端
服务端ip:192.168.1.101
下载canal 到/opt/softwares
wget https://github.com/alibaba/canal/releases/download/canal-1.0.26-preview-2/canal.deployer-1.0.26-SNAPSHOT.tar.gz
解压缩
mkdir /opt/canal && tar -zxvf canal.deployer-1.0.26-SNAPSHOT.tar.gz -C /opt/canal/
配置mysql
vim /etc/my.cnf [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1
创建用户
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
创建数据库
CREATE DATABASE IF NOT EXISTS canal_tsdb default charset utf8 COLLATE utf8_general_ci;
配置canal
cp /opt/canal/conf/example/instance.properties /opt/canal/conf/example/instance.properties.bak
vim /opt/canal/conf/example/instance.properties ################################################# ## mysql serverId canal.instance.mysql.slaveId=0 # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name=mysql-bin.000001 canal.instance.master.position=12172 canal.instance.master.timestamp= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.dir=$canal.file.data.dir:../conf/$canal.instance.destination: #canal.instance.tsdb.url=jdbc:h2:$canal.instance.tsdb.dir/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb canal.instance.tsdb.dbUsername=canal canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.defaultDatabaseName=test canal.instance.connectionCharset=UTF-8 # table regex canal.instance.filter.regex=.*\\\\..* # table black regex canal.instance.filter.black.regex= #################################################
修改canal.properties
vim /opt/canal/conf/canal.properties #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml # 找到tsdb的配置,注释上面的配置,启用下面的配置
启动canal
sh /opt/canal/bin/startup.sh
客户端
引入依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.23</version> </dependency>
客户端启动类
package canal.test; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; /** * @author tt */ public class SimpleCanalClientExample public static void main(String[] args) // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.101", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try connector.connect(); connector.subscribe(".*\\\\..*"); connector.rollback(); int totalEmptyCount = 120; boolean flag = true; while (flag || emptyCount < totalEmptyCount) Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) emptyCount++; System.out.println("empty count : " + emptyCount); try Thread.sleep(1000); catch (InterruptedException e) else emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \\n", batchId, size); printEntry(message.getEntries()); connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 System.out.println("empty too many times, exit"); finally connector.disconnect(); private static void printEntry(List<CanalEntry.Entry> entrys) for (CanalEntry.Entry entry : entrys) if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) continue; CanalEntry.RowChange rowChage = null; try rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); catch (Exception e) throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) if (eventType == CanalEntry.EventType.DELETE) printColumn(rowData.getBeforeColumnsList()); else if (eventType == CanalEntry.EventType.INSERT) printColumn(rowData.getAfterColumnsList()); else System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); private static void printColumn(List<CanalEntry.Column> columns) for (CanalEntry.Column column : columns) System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
快速开始到这里就结束了。
以上是关于canal 快速开始的主要内容,如果未能解决你的问题,请参考以下文章