Canal或maxwell实时采集MySQL数据到Kafka
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Canal或maxwell实时采集MySQL数据到Kafka相关的知识,希望对你有一定的参考价值。
文章目录
mysql的binlog
- 二进制日志
- 以事件形式记录了所有的DDL和DML,还包含语句所执行的消耗的时间
- MySQL的binlog是事务安全型的
- 开启binlog会有1%左右的性能损耗
- 一般场景:MySQL主从复制
MySQL主从复制原理图
1、编辑MySQL配置
sudo find / -name my.cnf
sudo vim /etc/my.cnf
添加如下内容
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=b1
参数 | 说明 |
---|---|
server-id | MySQL主从复制时,主从之间每个实例都有独一无二的ID |
log-bin | 生成的日志文件的前缀 |
binlog_format | binlog格式 |
binlog-do-db | 指定哪些库需要写到binlog;如果不配置,就会是所有库 |
binlog_format 参数值 | 参数值说明 | 空间占用 | 数据一致性 |
---|---|---|---|
statement | 语句级:记录每次一执行写操作的语句 | 小 | 如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致 |
row | 行级:记录每次操作后每行记录的变化 | 大 | 绝对支持 |
mixed | statement 的升级版 | 小 | 极端情况下仍会造成数据不一致 |
2、重启MySQL
sudo systemctl restart mysqld
3、检测配置是否成功
mysql -uroot -p123456
show variables like'%log_bin%';
查看
log_bin
相关变量
sudo ls -l /var/lib/mysql | grep mysql-bin
查看binlog文件
4、MySQL数据准备
被监控的MySQL数据
DROP DATABASE IF EXISTS b1;
CREATE DATABASE b1;
CREATE TABLE b1.t11 (f1 INT PRIMARY KEY,f2 INT);
CREATE TABLE b1.t22 (f1 INT PRIMARY KEY,f2 INT);
INSERT b1.t11 VALUES (1,10);
INSERT b1.t22 VALUES (1,10);
插入数据(数据同步时才插入)
INSERT b1.t11 VALUES (2,20),(3,NULL),(4,40);
INSERT b1.t22 VALUES (2,20),(3,NULL),(4,40);
5、Kafka准备
创建Kafka的主题
kafka-topics.sh \\
--delete \\
--zookeeper hadoop102:2181/kafka \\
--topic t1
kafka-topics.sh \\
--create \\
--zookeeper hadoop102:2181/kafka \\
--replication-factor 1 \\
--partitions 2 \\
--topic t1
起一个终端消费者,用于检测MySQL数据是否同步到Kafka
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t1
Canal
- Canal是Java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件
- 使用场景:实时采集MySQL数据到Kafka
- 工作原理:
1、Canal伪装自己为MySQL的Slave,向MySQL的Master发送dump协议
2、MySQL的Master收到dump请求,推送binary log给Canal
3、Canal解析binary log
1、下载Canal
下载地址:https://github.com/alibaba/canal/releases
wget http://github.com/alibaba/canal/releases/download/canal-1.1.6-alpha-1/canal.deployer-1.1.6-SNAPSHOT.tar.gz
2、解压
mkdir $B_HOME/canal
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C $B_HOME/canal/
cd $B_HOME/canal
3、修改配置
Canal配置有2种级别:server级别和instance级别
server级别:全局性的配置,一个sever可配置多个instance
instance级别:最小订阅MySQL
cd $B_HOME/canal
server配置
vim ./conf/canal.properties
# Canal服务器绑定ip地址和端口
canal.ip = hadoop102
# 服务模式:tcp、kafka、RocketMQ…
canal.serverMode = kafka
# Kafka地址
kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
instance配置
canal.destinations
可配置多个实例,默认有个example
vim ./conf/example/instance.properties
# 要监控的MySQL地址
canal.instance.master.address=hadoop102:3306
# 监控哪些【库.表】(转义符需要双斜杠)
canal.instance.filter.regex=b1\\\\.t1.*
# 消息队列的主题
canal.mq.topic=t1
# 散列模式的分区数, 要和Kafka主题的分区数保持一致
canal.mq.partitionsNum=2
实例默认的数据库用户及其密码都是canal
,所以创建一个同名用户
mysql -uroot -p123456
-- 创建用户canal,%表示所有地址可访问,密码canal,并授予所有库和表的指定权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* to 'canal'@'%' identified by 'canal';
4、启动
./bin/startup.sh
启动后 用
jps
可看到CanalLauncher
Maxwell
Java编写的MySQL数据实时抓取软件,其抓取原理基于binlog
1、准备工作
mysql -uroot -p123456
- 在MySQL中创建一个用于存储maxwell元数据的数据库
CREATE DATABASE `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
- 创建可以操作maxwell库的用户,名为
maxwell
密码为maxwell
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
- 给用户
maxwell
分配操作其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;
2、安装和配置maxwell
下载
wget http://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
解压
tar -zxvf maxwell-1.27.1.tar.gz -C $B_HOME/
配置
cd $B_HOME/maxwell-1.27.1
vim ./config.properties
# 日志级别
log_level=info
# 生产者
producer=kafka
# Kafka服务地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# Kafka主题
kafka_topic=t1
# 生产者分区依据:按照主键的hash
producer_partition_by=primary_key
# MySQL登录信息
host=hadoop102
user=maxwell
password=maxwell
启动maxwell
cd $B_HOME/maxwell-1.27.1
./bin/maxwell --config config.properties --filter exclude:b1.t22
可选参数 | 说明 |
---|---|
--config | 配置文件 |
--filter | 指定监控的表,建议传参(有语法提示) |
--daemon | 后台运行 |
比较Maxwell和Canal
1、Canal功能更强大、技术更成熟;Maxwell更轻
2、Maxwell不能直接支持高可用,Canal支持
3、Maxwell只支持JSON格式,Canal可自定义格式
输出格式比较
4、Maxwell可导出完整的历史数据用于初始化,步骤如下:
·4.1、全局配置加入client_id
vim $B_HOME/maxwell-1.27.1/config.properties
# 用于初始化维度表数据
client_id=maxwell_1
·4.2、执行初始化脚本(先重启maxwell,再执行该命令)
cd $B_HOME/maxwell-1.27.1/
./bin/maxwell-bootstrap \\
--user maxwell \\
--password maxwell \\
--host hadoop102 \\
--database b1 \\
--table t11 \\
--client_id maxwell_1
Appendix
en | 🔉 | cn |
---|---|---|
canal | kəˈnæl | n. 运河;[地理] 水道;[建] 管道;灌溉水渠 |
maxwell | ˈmæksˌwɛl | n. 麦克斯韦(磁通量单位) |
dump | dʌmp | v. 丢弃;倾销;转存(计算机数据);n. 垃圾场; |
grant | ɡrænt | v. 授予;承认n. 补助金; |
bootstrap | ˈbuːtstræp | n. (靴筒后的)靴襻;[计] 引导程序,辅助程序 |
以上是关于Canal或maxwell实时采集MySQL数据到Kafka的主要内容,如果未能解决你的问题,请参考以下文章
大数据之实时数据源同步中间件--生产上Canal与Maxwell颠峰对决
spring boot使用mysql-binlog-connector-java解析mysql binlog日志(实时+离线)