Canal或maxwell实时采集MySQL数据到Kafka

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Canal或maxwell实时采集MySQL数据到Kafka相关的知识,希望对你有一定的参考价值。

文章目录

mysql的binlog

  • 二进制日志
  • 以事件形式记录了所有的DDL和DML,还包含语句所执行的消耗的时间
  • MySQL的binlog是事务安全型的
  • 开启binlog会有1%左右的性能损耗
  • 一般场景:MySQL主从复制

MySQL主从复制原理图

1、编辑MySQL配置

sudo find / -name my.cnf
sudo vim /etc/my.cnf

添加如下内容

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=b1
参数说明
server-idMySQL主从复制时,主从之间每个实例都有独一无二的ID
log-bin生成的日志文件的前缀
binlog_formatbinlog格式
binlog-do-db指定哪些库需要写到binlog;如果不配置,就会是所有库
binlog_format参数值参数值说明空间占用数据一致性
statement语句级:记录每次一执行写操作的语句如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致
row行级:记录每次操作后每行记录的变化绝对支持
mixedstatement的升级版极端情况下仍会造成数据不一致

2、重启MySQL

sudo systemctl restart mysqld

3、检测配置是否成功

mysql -uroot -p123456
show variables like'%log_bin%';

查看log_bin相关变量

sudo ls -l /var/lib/mysql | grep mysql-bin

查看binlog文件

4、MySQL数据准备

被监控的MySQL数据

DROP DATABASE IF EXISTS b1;
CREATE DATABASE b1;
CREATE TABLE b1.t11 (f1 INT PRIMARY KEY,f2 INT);
CREATE TABLE b1.t22 (f1 INT PRIMARY KEY,f2 INT);
INSERT b1.t11 VALUES (1,10);
INSERT b1.t22 VALUES (1,10);

插入数据(数据同步时才插入)

INSERT b1.t11 VALUES (2,20),(3,NULL),(4,40);
INSERT b1.t22 VALUES (2,20),(3,NULL),(4,40);

5、Kafka准备

创建Kafka的主题

kafka-topics.sh \\
--delete \\
--zookeeper hadoop102:2181/kafka \\
--topic t1

kafka-topics.sh \\
--create \\
--zookeeper hadoop102:2181/kafka \\
--replication-factor 1 \\
--partitions 2 \\
--topic t1

起一个终端消费者,用于检测MySQL数据是否同步到Kafka

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t1

Canal

  • Canal是Java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件
  • 使用场景:实时采集MySQL数据到Kafka
  • 工作原理:
    1、Canal伪装自己为MySQL的Slave,向MySQL的Master发送dump协议
    2、MySQL的Master收到dump请求,推送binary log给Canal
    3、Canal解析binary log

1、下载Canal

下载地址:https://github.com/alibaba/canal/releases

wget http://github.com/alibaba/canal/releases/download/canal-1.1.6-alpha-1/canal.deployer-1.1.6-SNAPSHOT.tar.gz

2、解压

mkdir $B_HOME/canal
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C $B_HOME/canal/
cd $B_HOME/canal

3、修改配置

Canal配置有2种级别:server级别和instance级别
server级别:全局性的配置,一个sever可配置多个instance
instance级别:最小订阅MySQL

cd $B_HOME/canal

server配置

vim ./conf/canal.properties
# Canal服务器绑定ip地址和端口
canal.ip = hadoop102
# 服务模式:tcp、kafka、RocketMQ…
canal.serverMode = kafka
# Kafka地址
kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

instance配置

canal.destinations可配置多个实例,默认有个example

vim ./conf/example/instance.properties
# 要监控的MySQL地址
canal.instance.master.address=hadoop102:3306
# 监控哪些【库.表】(转义符需要双斜杠)
canal.instance.filter.regex=b1\\\\.t1.*

# 消息队列的主题
canal.mq.topic=t1
# 散列模式的分区数, 要和Kafka主题的分区数保持一致
canal.mq.partitionsNum=2

实例默认的数据库用户及其密码都是canal,所以创建一个同名用户

mysql -uroot -p123456
-- 创建用户canal,%表示所有地址可访问,密码canal,并授予所有库和表的指定权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* to 'canal'@'%' identified by 'canal';

4、启动

./bin/startup.sh

启动后 用jps 可看到CanalLauncher

Maxwell

Java编写的MySQL数据实时抓取软件,其抓取原理基于binlog

1、准备工作

mysql -uroot -p123456
  1. 在MySQL中创建一个用于存储maxwell元数据的数据库
CREATE DATABASE `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
  1. 创建可以操作maxwell库的用户,名为maxwell密码为maxwell
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
  1. 给用户maxwell分配操作其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;

2、安装和配置maxwell

下载

wget http://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz

解压

tar -zxvf maxwell-1.27.1.tar.gz -C $B_HOME/

配置

cd $B_HOME/maxwell-1.27.1
vim ./config.properties
# 日志级别
log_level=info
# 生产者
producer=kafka
# Kafka服务地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# Kafka主题
kafka_topic=t1
# 生产者分区依据:按照主键的hash
producer_partition_by=primary_key

# MySQL登录信息
host=hadoop102
user=maxwell
password=maxwell

启动maxwell

cd $B_HOME/maxwell-1.27.1
./bin/maxwell --config config.properties --filter exclude:b1.t22
可选参数说明
--config配置文件
--filter指定监控的表,建议传参(有语法提示)
--daemon后台运行

比较Maxwell和Canal

1、Canal功能更强大、技术更成熟;Maxwell更轻
2、Maxwell不能直接支持高可用,Canal支持
3、Maxwell只支持JSON格式,Canal可自定义格式

输出格式比较

4、Maxwell可导出完整的历史数据用于初始化,步骤如下:

·4.1、全局配置加入client_id

vim $B_HOME/maxwell-1.27.1/config.properties
# 用于初始化维度表数据
client_id=maxwell_1

·4.2、执行初始化脚本(先重启maxwell,再执行该命令)

cd $B_HOME/maxwell-1.27.1/
./bin/maxwell-bootstrap \\
--user maxwell \\
--password maxwell \\
--host hadoop102 \\
--database b1 \\
--table t11 \\
--client_id maxwell_1

Appendix

en🔉cn
canalkəˈnæln. 运河;[地理] 水道;[建] 管道;灌溉水渠
maxwellˈmæksˌwɛln. 麦克斯韦(磁通量单位)
dumpdʌmpv. 丢弃;倾销;转存(计算机数据);n. 垃圾场;
grantɡræntv. 授予;承认n. 补助金;
bootstrapˈbuːtstræpn. (靴筒后的)靴襻;[计] 引导程序,辅助程序

以上是关于Canal或maxwell实时采集MySQL数据到Kafka的主要内容,如果未能解决你的问题,请参考以下文章

MaxWell(Mysql实时数据采集)

大数据之实时数据源同步中间件--生产上Canal与Maxwell颠峰对决

大数据课后作业:从零搭建canal实时采集数据

spring boot使用mysql-binlog-connector-java解析mysql binlog日志(实时+离线)

大数据实时传输组件Maxwell-问题集锦

Maxwell 一款简单易上手的实时抓取Mysql数据的软件