canal+canal-admin+zookeeper集群搭建加python验证数据
Posted xiaoweichihuo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal+canal-admin+zookeeper集群搭建加python验证数据相关的知识,希望对你有一定的参考价值。
一 规划
canal集群涉及组件算比较多,高可用是通过主备方式,生产环境一般是两个canal-server,一个canal-admimn,两个canal-clicent 按需求可选,一个zookeeper集群,本次为学习使用,会简单安装并进行模拟验证,规划为:
ip | 用途 | 端口 |
---|---|---|
10.10.10.1 | mysql | 3306 |
10.10.10.2 | canal-admin | 8090 |
10.10.10.2 | zooleeper | 2181 |
10.10.10.3 | canal-server | 11111 |
10.10.10.4 | canal-server | 11111 |
所有软件安装目录为/data下
根据canal版本不同,需要java版本不同,本次实验安装为canal1.1.4,安装jdk-8u201即可,首先每台机器需要先预安装上java环境
wget https://repo.huaweicloud.com/java/jdk/8u201-b09/jdk-8u201-linux-x64.tar.gz
tar -zxvf jdk-8u201-linux-x64.tar.gz
mv jdk1.8.0_201 /usr/local/jdk1.8/
vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8/
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
source /etc/profile
二 安装mysql,并创建使用账号
安装mysql过程可参考网上文章,需要再mysql创建canal账号,机器ip10.10.10.1
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
二 安装canal-admin,zooleeper
- 安装canal-admin,机器ip10.10.10.2
下载安装包
midir -p /data
cd /data
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
data目录下解压缩安装
mkdir canal-admin
tar zxvf canal.admin-$version.tar.gz -C canal-admin
修改服务配置文件
cd canal-admin
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 10.10.10.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://$spring.datasource.address/$spring.datasource.database?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
主要修改项:
address: 10.10.10.1:3306
database: canal_manager
username: canal
password: canal
全是服务数据库相关内容
canal-admin元数据信息需要一个mysql库存储,在mysql上新建canal_manager库并给用户canal授权,这个库单纯是canal-admin使用
create database canal_manager;
grant all privileges on canal_manager.* to canal;
然后初始化元数据脚本,脚本在/data/canal-admin/conf中canal_manager.sql
source canal_manager.sql
修改字段,这一步可能是这个版本问题,在插入数据时候modified_time字段插入了null值
ALTER TABLE canal_manager.canal_node_server
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_adapter_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_cluster
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_instance_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_user
MODIFY creation_date timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
启动服务
cd /data/canal-admin/bin
sh startup.sh
此时可以登录wen页面进行查看已经成功启动
地址 http://10.10.10.2:8089/index.html
默认用户密码 admin 123456
- 安装zookeeper,机器ip10.10.10.2
生产上一般安装集群,这里实验只单机安装,并与canal-admin安装在相同机器上了
下载安装
cd /data
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar zxvf apache-zookeeper-3.5.8-bin.tar.gz
mv apache-zookeeper-3.5.8-bin zookeeper
ln -s /data/zookeeper /usr/local/zookeeper
创建数据目录
mkdir -p /data/zookeeper/data
修改配置文件
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
clientPort=2181
主要修改项:
dataDir=/data/zookeeper/data
环境变量设置
vi /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
启动zookeeper
zkServer.sh start
相关命令
jps ## 查看进程
zkServer.sh status ## 查看状态
zkServer.sh stop
设置开机启动
vi /etc/rc.d/rc.local
su - root -c zkServer.sh start
现在admin和zk都已经安装好了 可以先建好集群,安装canal后直接加入就可以了
在服务页面登录后选择集群管理新建集群,记住集群名字canalzk后面要用
三 安装canal-server
下载安装,机器ip 10.10.10.3
cd /data
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir /data/canal
tar zxvf canal.deployer-1.1.4.tar.gz -C /data/canal
一个canal软件可以启用多个实例,每个实例类似一个同步配置,如果是单机的话需要修改的配置文件是,conf/example/instance.properties,这个是实例的配置文件,example一般可以当做实例名,可以根据需要复制修改整个目录,新建其他实例,conf/canal.properties为软件的配置文件,基本不需要修改,这部分可以查看另一篇文章,单机canal搭建。
集群的话配置文件一般在admin上统一配置,实例的配置也是,本地只需要配置canal_local.properties一些基础信息,通过此配置文件启动为集群模式
cd canal
vi conf/canal_local.properties
# register ip
canal.register.ip = 10.10.10.3
# canal server注册到外部zookeeper、admin的ip信息
canal.port = 11111
# canal server 的metrics 端口
# canal admin config
canal.admin.manager = 10.10.10.2:8089
# canal-admin服务ip和端口
canal.admin.port = 11110
# admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
canal.admin.user = admin
# canal admin 应用下 canal.adminUser 的值
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# canal admin 应用下 canal.adminPasswd 下的值
# admin auto register
canal.admin.register.auto = true
# 是否开启自动注册模式
canal.admin.register.cluster =
canal.admin.register.cluster = canalzk
# 可以指定默认注册的集群名,如果不指定,默认注册为单机模式,集群的名字需要在 canal admin上存在
canal.admin.register.name = canal01
# 注册到 canal admin 上server的名字,唯一有意义即可
按注释进行修改后,在另一台机器上进行同样的操作,修改参数时候注意修改对应内容,
分别在两台机器上启动canal-server服务即可自动添加到集群了,需要通过指定配置文件启动
sh startup.sh local /data/canal/conf/canal_local.properties
这时候在admin服务web页面集群管理里可以看到已经有两个服务了,但是最好先配置好集群配置信息后再起来
四 集群配置部分
然后再集群进行canal的配置管理和新建canal实例操作了
先在集群管理选中集群进行主配置修改,直接引入摸版后修改,只列出修改部分
1.修改集群配置
可以注释掉,需要的话自己设置,并且server内配置已经有了
# canal.user = canal
# canal.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# canal.admin.manager = 10.181.0.154:8089
# canal.admin.port = 11110
# canal.admin.user = admin
# canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
zk地址,集群逗号分割
canal.zkServers = 10.10.10.2:2181
服务链接模式,按需要选 tcp, kafka, RocketMQ
canal.serverMode = tcp
存放地址重要
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
具体参数比较多,如果后面接入kafka或者RocketMQ清查文档进行调整,其中比较主要参数canal.instance.global.spring.xml
配置好集群信息后可以启动两个server服务了,在web页面就可以进行管理操作了
2。新建实例
在页面选择instance管理选择新建实例选项
配置文件基本和单机配置一样
主要修改部分
同步数据库地址和点位信息,不写则从当前开始同步
canal.instance.master.address=10.10.10.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
数据库链接信息,用上面准备好的同步用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
同步过滤规则
canal.instance.filter.regex=crm\\\\..*
查看全部包含“mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 常见例⼦:
- 所有表:.* or .\\…
- canal schema下所有表: canal\\…*
- canal下的以canal打头的表:canal\\.canal.*
- canal schema下的⼀张表:canal.test1
- 多个规则组合使⽤:canal\\…*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps.
mixed/statement因为不解析sql,所以⽆法准确提取tableName进⾏过滤”的文档
配置完后可以启动了,在界面可以看到实例和服务都已经启动了,在zk机器上可以查看现在提供服务的canal-server是哪台
./zkCli.sh
get /otter/canal/destinations/crm/running
"active":true,"address":"10.10.10.3:11111"
查看点位记录
get /otter/canal/destinations/instance的名字/1001/cursor
集群模式下只有一台canal会进行同步,另一台为挂起状态,在zk中会有同步机器信息,并如果同步机器挂了后会启动另一台canal进行同步达到高可用。
四 验证部分
1.可以用python编写监听脚本进行接收验证canal高可用和同步是否成功
脚本如下,需要安装模块较多,不分对版本有要求,解决后未记录,可在网上查看
```bash
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
import time
from kazoo.client import KazooClient
import sys
import json
class CanalUtils():
def __init__(self,host,port):
# 建立与canal服务端的连接
self.client = Client()
self.client.connect(host=host, port=port) # canal服务端部署的主机IP与端口
self.client.check_valid(username=b'canal', password=b'canal') # 自行填写配置的数据库账户密码
self.client.subscribe(client_id=b'1001', destination=b'crm', filter=b'.*\\\\..*')
def get_canal_change(self):
while True:
message = self.client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data[column.name] = column.value
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data[column.name] = column.value
else:
format_data['before'] = format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
# app = CanalUtils('10.181.0.149',11111)
# app.get_canal_change()
try:
nodePath = "/otter/canal/destinations/crm/running"
host = "10.10.10.2"
port = "2181"
timeout = 100
zkc = KazooClient(hosts=host + ':' + port, timeout=timeout)
zkc.start()
dataAndStat = zkc.get(nodePath)
data = json.loads(dataAndStat[0].decode('utf-8'))['address']
host = data.split(':')[0]
port = data.split(':')[1]
print(host)
zkc.stop()
zkc.close()
app = CanalUtils(host,int(port))
app.get_canal_change()
except Exception as err:
print(err)
2.同步验证
启动脚本后对crm数据库下表数剧进行增删改查可以看到监听信息
Auth succed
Subscribe succed
'db': 'crm', 'table': 'web_dap', 'event_type': 1, 'data': 'id': '6', 'title': '测试', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'
'db': 'crm', 'table': 'web_dap', 'event_type': 2, 'data': 'before': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1', 'after': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'
'db': 'crm', 'table': 'web_dap', 'event_type': 3, 'data': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'
3.高可用验证
查看当前提供服务的canal
get /otter/canal/destinations/crm/running
“active”:true,“address”:“10.10.10.3:11111”
关闭这个节点后再查看
get /otter/canal/destinations/crm/running
“active”:true,“address”:“10.10.10.4:11111”
在启动脚本进行验证同步
以上是关于canal+canal-admin+zookeeper集群搭建加python验证数据的主要内容,如果未能解决你的问题,请参考以下文章