ElasticSearch实战(四十七)-Canal 实现 MySQL 数据实时同步方案

Posted 张志翔ۤ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch实战(四十七)-Canal 实现 MySQL 数据实时同步方案相关的知识,希望对你有一定的参考价值。

        Canal 主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费

        早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

        基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

        当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

一、Canal 工作原理

        1、工作原理

        MySQL主备复制原理:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

        Canal 工作原理:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

二、Canal 快速开始

        对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

        注:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

        授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

       1、启动

        下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.5 版本为例,命令如下:

  $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

        解压缩,命令如下:

$ mkdir /tmp/canal
$ tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

        解压完成后,进入 /tmp/canal 目录,可以看到如下结构:

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

        配置修改,命令如下:

  $ vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\\*\\\\\\\\..\\*
  • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

        启动,命令如下:

  $ sh bin/startup.sh

        查看 server 日志,命令如下:

  $ vim logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

        查看 instance 的日志,命令如下:

  $ vim logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

        2、关闭

  $ sh bin/stop.sh

        到此就完成了快速启动,一般工作中不会这么干,一般是和kafka一起使用。 

三、Kafka快速开始

        1、环境版本

  • 操作系统:CentOS release 7.8
  • java版本: jdk1.8
  • kafka 版本: kafka_2.11-1.1.1.tgz

        2、安装kafka

        下载压缩包, 复制到固定目录并解压,命令如下:

$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz
$ mkdir  -p /usr/local/kafka
$ cp   kafka_2.11-1.1.1.tgz   /usr/local/kafka
$ tar -zxvf kafka_2.11-1.1.1.tgz

        3、修改配置文件

$ vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 
zookeeper.connect=192.168.1.110:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.117:9092 #本机ip
# ...

        4、启动server

        启动kafka,命令如下:

  $ bin/kafka-server-start.sh  -daemon  config/server.properties &

        查看所有topic,命令如下:

  $ bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

        查看指定 topic 下面的数据,命令如下:

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092  --from-beginning --topic example_t
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

        到此就完成了kafka快速启动。

四、Zookeeper 快速开始

        1、环境版本

  • 操作系统:CentOS release 7.8
  • java版本: jdk1.8
  • zookeeper版本: zookeeper-3.4.11

        2、安装jdk

        安装文章已经准备好,传送门如下:

        Centos7 安装Jdk1.8

        3、安装zookeeper

        下载源码包,并解压,命令如下:

$ wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
$ tar zxvf zookeeper-3.4.11.tar.gz  
$ mv zookeeper-3.4.11 /usr/local/zookeeper

        4、修改环境变量

        编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

# ZooKeeper Env
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

        运行以下命令使环境变量生效:

  $ source /etc/profile

        5、重命名配置文件

        初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg,命令如下:

  $ mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

        6、单机模式--修改配置文件

        创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

        如果是多节点,配置文件中尾部增加

server.1=192.168.1.110:2888:3888
server.2=192.168.1.111:2888:3888
server.3=192.168.1.112:2888:3888

        同时,增加

#master
echo "1">/usr/local/zookeeper/data/myid

#slave1
echo "2">/usr/local/zookeeper/data/myid

#slave2
echo "3">/usr/local/zookeeper/data/myid

        7、启动 ZooKeeper 服务

# cd /usr/local/zookeeper/zookeeper-3.4.11/bin
# ./zkServer.sh  start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

        8、验证ZooKeeper服务

        服务启动完成后,可以使用 telnet 和 stat 命令验证服务器启动是否正常:

# telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:
/127.0.0.1:48430[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.

        9、停止 ZooKeeper 服务

        想要停止 ZooKeeper 服务, 可以使用如下命令:

# cd /usr/local/zookeeper/zookeeper-3.4.11/bin
# ./zkServer.sh  stop
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

        10、zk ui安装 (选装,页面查看zk的数据)

        拉取代码,命令如下:

  $ git clone https://github.com/DeemOpen/zkui.git

        源码编译需要安装 maven,命令如下:

$ wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
$ cd zkui/
$ yum install -y maven
$ mvn clean install

        修改配置文件默认值,命令如下:

$ vim config.cfg
serverPort=9090     #指定端口
zkServer=192.168.1.110:2181
sessionTimeout=300000

        11、启动程序至后台

        2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本,命令如下:

  $ nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 

        用浏览器访问:http://192.168.1.110:9090/

五、Canal Kafka 快速启动

        Canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

        1、环境版本

  • 操作系统:CentOS release 7.8
  • java版本: jdk1.8
  • canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

        2、安装zookeeper

        参照上面的zookeeper快速开始。

        3、安装kafka

        参照上面的kafka快速开始。

        4、安装canal.server

        下载压缩包,到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz,将canal.deployer 复制到固定目录并解压,命令如下:

$ mkdir -p /usr/local/canal
$ cp canal.deployer-1.1.5.tar.gz /usr/local/canal
$ tar -zxvf canal.deployer-1.1.5.tar.gz 

        5、配置修改参数

        首先修改 instance 配置文件,命令如下:

$ vim conf/example/instance.properties
#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\\\..*,.*\\\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

        对应 ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考上述 Canal 快速开始。

        然后修改 canal 配置文件,打开配置文件,命令如下:

  $ vim /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

         mq 相关参数说明,如下所示:

参数名参数说明默认值
canal.mq.serverskafka为bootstrap.servers
rocketMQ中为nameserver列表
127.0.0.1:6667
canal.mq.retries发送失败重试次数0
canal.mq.batchSizekafka为ProducerConfig.BATCH_SIZE_CONFIG
rocketMQ无意义
16384
canal.mq.maxRequestSizekafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG
rocketMQ无意义
1048576
canal.mq.lingerMskafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200
rocketMQ无意义
1
canal.mq.bufferMemorykafka为ProducerConfig.BUFFER_MEMORY_CONFIG
rocketMQ无意义
33554432
canal.mq.ackskafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义
all
canal.mq.kafka.kerberos.enablekafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义
false
canal.mq.kafka.kerberos.krb5FilePathkafka kerberos认证
rocketMQ无意义
../conf/kerberos/krb5.conf
canal.mq.kafka.kerberos.jaasFilePathkafka kerberos认证
rocketMQ无意义
../conf/kerberos/jaas.conf
canal.mq.producerGroupkafka无意义
rocketMQ为ProducerGroup名
Canal-Producer
canal.mq.accessChannelkafka无意义
rocketMQ为channel模式,如果为aliyun则配置为cloud
local
---------
canal.mq.vhost=rabbitMQ配置
canal.mq.exchange=rabbitMQ配置
canal.mq.username=rabbitMQ配置
canal.mq.password=rabbitMQ配置
canal.mq.aliyunuid=rabbitMQ配置
---------
canal.mq.canalBatchSize获取canal数据的批次大小50
canal.mq.canalGetTimeout获取canal数据的超时时间100
canal.mq.parallelThreadSizemq数据转换并行处理的并发度8
canal.mq.flatMessage是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码
false
---------
canal.mq.topicmq里的topic名
canal.mq.dynamicTopicmq里的动态topic规则, 1.1.3版本支持
canal.mq.partition单队列模式的分区下标,1
canal.mq.partitionsNum散列模式的分区数
canal.mq.partitionHash散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

        canal.mq.dynamicTopic 表达式说明:

        Canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

        为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

        大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力。

        canal.mq.partitionHash 表达式说明

        Canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\\\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.*\\\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.*\\\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.*\\\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\\\\.test:id,.\\\\..* , 针对test的表按照id散列,其余的表按照table散列

        注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

        mq 顺序性问题

        binlog 本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  3. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

        MQ发送性能数据

        1.1.5版本可以在5k~50k左右,具体可参考:Canal-MQ-Performance

        6、启动

$ cd /usr/local/canal/
$ sh bin/startup.sh

        7、查看日志

        查看 logs/canal/canal.log,命令如下:

  $ vim logs/canal/canal.log

        查看instance的日志,命令如下:

  $ vim logs/example/example.log

        8、关闭

$ cd /usr/local/canal/
$ sh bin/stop.sh

        9、MQ数据消费

        canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能

六、Canal Admin 快速启动

        canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

        1、准备

        canal-admin 的限定依赖:

  1. MySQL,用于存储配置和节点等相关数据
  2. canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

        2、部署

        下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例:

  $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz

        解压缩,命令如下:

$ mkdir /tmp/canal-admin
$ tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin

        解压完成后,进入 /tmp/canal 目录,可以看到如下结构:

drwxr-xr-x   6 agapple  staff   204B  8 31 15:37 bin
drwxr-xr-x   8 agapple  staff   272B  8 31 15:37 conf
drwxr-xr-x  90 agapple  staff   3.0K  8 31 15:37 lib
drwxr-xr-x   2 agapple  staff    68B  8 31 15:26 logs

        配置修改,命令如下:

vim conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

        初始化元数据库,命令如下:

mysql -h127.1 -uroot -p

# 导入初始化SQL
> source conf/canal_manager.sql

        初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

        3、启动

  $ sh bin/startup.sh

        查看 admin 日志,命令如下:

$ vim logs/admin.log

2019-08-31 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

        此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456,图示如下:

        4、关闭

  $ sh bin/stop.sh

        5、canal-server端配置

        使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

        启动admin-server即可。

        或在启动命令中使用参数:sh bin/startup.sh local 指定配置。

        到此 Canal 实现 MySQL 数据实时同步方案介绍完成。

以上是关于ElasticSearch实战(四十七)-Canal 实现 MySQL 数据实时同步方案的主要内容,如果未能解决你的问题,请参考以下文章

四十七 Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)用Django实现搜索的自动补全功能

tableau实战系列(四十七)-Tableau快速生成可视化视图

tableau实战系列(四十七)-Tableau快速生成可视化视图

ElasticSearch实战(四十一)-存储桶聚合

ElasticSearch实战(四十一)-存储桶聚合

ElasticSearch实战(十七)-个性化设置数据字段属性