canal+canal-admin+zookeeper集群搭建加python验证数据

Posted xiaoweichihuo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal+canal-admin+zookeeper集群搭建加python验证数据相关的知识,希望对你有一定的参考价值。

一 规划

canal集群涉及组件算比较多,高可用是通过主备方式,生产环境一般是两个canal-server,一个canal-admimn,两个canal-clicent 按需求可选,一个zookeeper集群,本次为学习使用,会简单安装并进行模拟验证,规划为:

ip用途端口
10.10.10.1mysql3306
10.10.10.2canal-admin8090
10.10.10.2zooleeper2181
10.10.10.3canal-server11111
10.10.10.4canal-server11111

所有软件安装目录为/data下
根据canal版本不同,需要java版本不同,本次实验安装为canal1.1.4,安装jdk-8u201即可,首先每台机器需要先预安装上java环境

wget https://repo.huaweicloud.com/java/jdk/8u201-b09/jdk-8u201-linux-x64.tar.gz
tar -zxvf jdk-8u201-linux-x64.tar.gz
mv jdk1.8.0_201 /usr/local/jdk1.8/

vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8/
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

source /etc/profile

二 安装mysql,并创建使用账号

安装mysql过程可参考网上文章,需要再mysql创建canal账号,机器ip10.10.10.1

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

二 安装canal-admin,zooleeper

  1. 安装canal-admin,机器ip10.10.10.2
    下载安装包
midir -p /data
cd /data
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

data目录下解压缩安装

mkdir canal-admin
tar zxvf canal.admin-$version.tar.gz  -C canal-admin

修改服务配置文件

cd canal-admin
vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 10.10.10.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://$spring.datasource.address/$spring.datasource.database?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

主要修改项:
address: 10.10.10.1:3306
database: canal_manager
username: canal
password: canal
全是服务数据库相关内容

canal-admin元数据信息需要一个mysql库存储,在mysql上新建canal_manager库并给用户canal授权,这个库单纯是canal-admin使用

create database canal_manager;
grant all privileges on canal_manager.* to canal;

然后初始化元数据脚本,脚本在/data/canal-admin/conf中canal_manager.sql
source canal_manager.sql
修改字段,这一步可能是这个版本问题,在插入数据时候modified_time字段插入了null值
ALTER TABLE canal_manager.canal_node_server
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_adapter_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_cluster
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_instance_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_user
MODIFY creation_date timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

启动服务

cd /data/canal-admin/bin
sh startup.sh

此时可以登录wen页面进行查看已经成功启动
地址 http://10.10.10.2:8089/index.html
默认用户密码 admin 123456

  1. 安装zookeeper,机器ip10.10.10.2
    生产上一般安装集群,这里实验只单机安装,并与canal-admin安装在相同机器上了
    下载安装
 cd /data
 wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar zxvf apache-zookeeper-3.5.8-bin.tar.gz 
mv apache-zookeeper-3.5.8-bin zookeeper
ln -s /data/zookeeper /usr/local/zookeeper

创建数据目录

mkdir -p /data/zookeeper/data

修改配置文件

cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
clientPort=2181

主要修改项:
dataDir=/data/zookeeper/data

环境变量设置

vi /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

source /etc/profile

启动zookeeper

zkServer.sh start

相关命令

jps                                            ## 查看进程
zkServer.sh status                    ## 查看状态
zkServer.sh stop

设置开机启动

vi /etc/rc.d/rc.local
su - root -c zkServer.sh start

现在admin和zk都已经安装好了 可以先建好集群,安装canal后直接加入就可以了
在服务页面登录后选择集群管理新建集群,记住集群名字canalzk后面要用

三 安装canal-server

下载安装,机器ip 10.10.10.3

cd /data
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir /data/canal
tar zxvf canal.deployer-1.1.4.tar.gz -C /data/canal

一个canal软件可以启用多个实例,每个实例类似一个同步配置,如果是单机的话需要修改的配置文件是,conf/example/instance.properties,这个是实例的配置文件,example一般可以当做实例名,可以根据需要复制修改整个目录,新建其他实例,conf/canal.properties为软件的配置文件,基本不需要修改,这部分可以查看另一篇文章,单机canal搭建。
集群的话配置文件一般在admin上统一配置,实例的配置也是,本地只需要配置canal_local.properties一些基础信息,通过此配置文件启动为集群模式

cd canal
vi conf/canal_local.properties
# register ip
canal.register.ip = 10.10.10.3
# canal server注册到外部zookeeper、admin的ip信息
canal.port = 11111
# canal server 的metrics 端口
# canal admin config
canal.admin.manager = 10.10.10.2:8089
# canal-admin服务ip和端口
canal.admin.port = 11110
# admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
canal.admin.user = admin
# canal admin 应用下 canal.adminUser 的值
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# canal admin 应用下 canal.adminPasswd 下的值
# admin auto register
canal.admin.register.auto = true
# 是否开启自动注册模式
canal.admin.register.cluster =
canal.admin.register.cluster = canalzk
# 可以指定默认注册的集群名,如果不指定,默认注册为单机模式,集群的名字需要在 canal admin上存在
canal.admin.register.name = canal01
# 注册到 canal admin 上server的名字,唯一有意义即可

按注释进行修改后,在另一台机器上进行同样的操作,修改参数时候注意修改对应内容,
分别在两台机器上启动canal-server服务即可自动添加到集群了,需要通过指定配置文件启动

sh startup.sh local /data/canal/conf/canal_local.properties

这时候在admin服务web页面集群管理里可以看到已经有两个服务了,但是最好先配置好集群配置信息后再起来

四 集群配置部分

然后再集群进行canal的配置管理和新建canal实例操作了
先在集群管理选中集群进行主配置修改,直接引入摸版后修改,只列出修改部分
1.修改集群配置

可以注释掉,需要的话自己设置,并且server内配置已经有了
# canal.user = canal
# canal.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# canal.admin.manager = 10.181.0.154:8089
# canal.admin.port = 11110
# canal.admin.user = admin
# canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
zk地址,集群逗号分割
canal.zkServers = 10.10.10.2:2181
服务链接模式,按需要选 tcp, kafka, RocketMQ 
canal.serverMode = tcp
存放地址重要
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

具体参数比较多,如果后面接入kafka或者RocketMQ清查文档进行调整,其中比较主要参数canal.instance.global.spring.xml

配置好集群信息后可以启动两个server服务了,在web页面就可以进行管理操作了

2。新建实例
在页面选择instance管理选择新建实例选项
配置文件基本和单机配置一样
主要修改部分

同步数据库地址和点位信息,不写则从当前开始同步
canal.instance.master.address=10.10.10.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
数据库链接信息,用上面准备好的同步用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
同步过滤规则
canal.instance.filter.regex=crm\\\\..*

查看全部包含“mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 常见例⼦:

  1. 所有表:.* or .\\…
  2. canal schema下所有表: canal\\…*
  3. canal下的以canal打头的表:canal\\.canal.*
  4. canal schema下的⼀张表:canal.test1
  5. 多个规则组合使⽤:canal\\…*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps.
    mixed/statement因为不解析sql,所以⽆法准确提取tableName进⾏过滤”的文档

配置完后可以启动了,在界面可以看到实例和服务都已经启动了,在zk机器上可以查看现在提供服务的canal-server是哪台

./zkCli.sh
get /otter/canal/destinations/crm/running
"active":true,"address":"10.10.10.3:11111"
查看点位记录
get /otter/canal/destinations/instance的名字/1001/cursor

集群模式下只有一台canal会进行同步,另一台为挂起状态,在zk中会有同步机器信息,并如果同步机器挂了后会启动另一台canal进行同步达到高可用。

四 验证部分

1.可以用python编写监听脚本进行接收验证canal高可用和同步是否成功
脚本如下,需要安装模块较多,不分对版本有要求,解决后未记录,可在网上查看

```bash
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
import time
from kazoo.client import KazooClient
import sys
import json


class CanalUtils():
    def __init__(self,host,port):
        # 建立与canal服务端的连接
        self.client = Client()
        self.client.connect(host=host, port=port)  # canal服务端部署的主机IP与端口
        self.client.check_valid(username=b'canal', password=b'canal')  # 自行填写配置的数据库账户密码
        self.client.subscribe(client_id=b'1001', destination=b'crm', filter=b'.*\\\\..*')

    def get_canal_change(self):
        while True:
            message = self.client.get(100)
            entries = message['entries']
            for entry in entries:
                entry_type = entry.entryType
                if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
                                  EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                    continue
                row_change = EntryProtocol_pb2.RowChange()
                row_change.MergeFromString(entry.storeValue)
                event_type = row_change.eventType
                header = entry.header
                database = header.schemaName
                table = header.tableName
                event_type = header.eventType
                for row in row_change.rowDatas:
                    format_data = dict()
                    if event_type == EntryProtocol_pb2.EventType.DELETE:
                        for column in row.beforeColumns:
                            format_data[column.name] = column.value
                    elif event_type == EntryProtocol_pb2.EventType.INSERT:
                        for column in row.afterColumns:
                            format_data[column.name] = column.value
                    else:
                        format_data['before'] = format_data['after'] = dict()
                        for column in row.beforeColumns:
                            format_data['before'][column.name] = column.value
                        for column in row.afterColumns:
                            format_data['after'][column.name] = column.value
                    data = dict(
                        db=database,
                        table=table,
                        event_type=event_type,
                        data=format_data,
                    )
                    print(data)
            time.sleep(1)

# app = CanalUtils('10.181.0.149',11111)
# app.get_canal_change()


try:
    nodePath = "/otter/canal/destinations/crm/running"
    host = "10.10.10.2"
    port = "2181"
    timeout = 100
    zkc = KazooClient(hosts=host + ':' + port, timeout=timeout)
    zkc.start()


    dataAndStat = zkc.get(nodePath)
    data = json.loads(dataAndStat[0].decode('utf-8'))['address']
    host = data.split(':')[0]
    port = data.split(':')[1]
    print(host)
    zkc.stop()
    zkc.close()

    app = CanalUtils(host,int(port))
    app.get_canal_change()
except Exception as err:
    print(err)

2.同步验证
启动脚本后对crm数据库下表数剧进行增删改查可以看到监听信息

Auth succed
Subscribe succed
'db': 'crm', 'table': 'web_dap', 'event_type': 1, 'data': 'id': '6', 'title': '测试', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'
'db': 'crm', 'table': 'web_dap', 'event_type': 2, 'data': 'before': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1', 'after': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'
'db': 'crm', 'table': 'web_dap', 'event_type': 3, 'data': 'id': '6', 'title': '哈哈哈', 'level': '1', 'create_time': '0000-00-00 00:00:00.000000', 'customer_id': '1'

3.高可用验证
查看当前提供服务的canal
get /otter/canal/destinations/crm/running
“active”:true,“address”:“10.10.10.3:11111”

关闭这个节点后再查看
get /otter/canal/destinations/crm/running
“active”:true,“address”:“10.10.10.4:11111”

在启动脚本进行验证同步

以上是关于canal+canal-admin+zookeeper集群搭建加python验证数据的主要内容,如果未能解决你的问题,请参考以下文章

Canal实时同步MySQL数据至Kafka集群,安装部署

canal实现同步mysql至es

canal 源码解析系列-工程结构说明

Mysql + canal + zookeeper环境搭建

canal 环境搭建 kafka Zookeeper安装

canal挂载在zookeeper,java客户端client,连接报超时