MaxWell(Mysql实时数据采集)
Posted 苍兰诺
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MaxWell(Mysql实时数据采集)相关的知识,希望对你有一定的参考价值。
mysql实时数据采集框架maxwell
基本介绍
maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
maxwell主要提供了下列功能:
-
支持 SELECT * FROM table 的方式进44行全量数据初始化 -
支持在主库发生failover后,自动恢复binlog位置(GTID) -
可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区 -
工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
binlog介绍
binlog是mysql当中的二进制日志,主要用于记录对mysql数据库当中的数据发生或潜在发生更改的SQL语句,并以二进制的形式保存在磁盘中,如果后续我们需要配置主从数据库,如果我们需要从数据库同步主数据库的内容,我们就可以通过binlog来进行同步。说白了binlog可以用于解决实时同步mysql数据库当中的数据 binlog的格式也有三种:STATEMENT、ROW、MIXED 。
-
STATMENT模式:基于SQL语句的复制(statement-based replication, SBR),每一条会修改数据的sql语句会记录到binlog中。 -
优点:不需要记录每一条SQL语句与每行的数据变化,这样子binlog的日志也会比较少,减少了磁盘IO,提高性能。 -
缺点:在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及 user-defined functions(udf)等会出现问题) -
基于行的复制(row-based replication, RBR):不记录每一条SQL语句的上下文信息,仅需记录哪条数据被修改了,修改成了什么样子了。 -
优点:不会出现某些特定情况下的存储过程或function、或trigger的调用和触发无法被正确复制的问题 -
缺点:会产生大量的日志,尤其是alter table的时候会让日志暴涨。 -
混合模式复制(mixed-based replication, MBR):以上两种模式的混合使用,一般的复制使 用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式。
因为statement只有sql,没有数据,无法获 取原始的变更日志,所以一般建议为ROW模式) mysql数据实时同步,我们可以通过解析mysql的bin-log的方式来实现,解析bin-log可以有多种方式,可以通过canal,或者max-well等各种方式实现。以下是各种抽取方式的对比介绍
开启binlog功能
第一步:添加普通用户maxwell
为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户, 进入mysql客户端,然后执行以下命令,进行授权
mysql -uroot -p
set global validate_password_policy=LOW;
set global validate_password_length=6;
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
flush privileges;
第二步:开启binlog机制
对于我们的mysql数据库,我们可以开启mysql的binlog功能,通过修改mysql的配置文件来实现开启,修改完配置文件之后,需要重启mysql服务
sudo vim /etc/my.cnf
# 添加或修改以下三行配置
log-bin= /var/lib/mysql/mysql-bin # 日志存放位置
binlog-format=ROW # binlog格式
server_id=1 # server id
# 重启mysql
sudo service mysqld restart
第三步:验证mysql服务
# 进入mysql客户端,并执行以下命令进行验证
mysql -uroot -p
mysql> show variables like '%log_bin%';
+---------------------------------+-----------------------------+
| Variable_name | Value |
+---------------------------------+-----------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/binlog |
| log_bin_index | /var/lib/mysql/binlog.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-----------------------------+
root@0f3525c74115:/# cd /var/lib/mysql/
root@0f3525c74115:/var/lib/mysql# ls -la
-rw-r----- 1 mysql mysql 324 Jul 2 14:54 binlog.000007
-rw-r----- 1 mysql mysql 156 Jul 2 14:54 binlog.000008
-rw-r----- 1 mysql mysql 32 Jul 2 14:54 binlog.index
# 以上是binlog文件说明已经开启
安装maxwell实现实时采集mysql数据
第一步:下载maxwell
下载:https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
第二步:修改配置
cp config.properties.example config.properties
vim config.properties
#-----------------
producer=kafka
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
host=node03
user=maxwell
password=123456
producer=kafka
host=node03
port=3306
user=maxwell
password=123456
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
kafka_topic=maxwell_kafka
# 一定要注意:一定要保证我们使用maxwell用户和123456密码能够连接上mysql数据库
启动服务
-
启动我们的zookeeper服务 -
启动kafka服务并创建kafka的topic -
启动maxwell服务,测试向数据库当中插入数据,并查看kafka当中是否能够同步到mysql数据
# 创建kafka topic 分区 3 副本 2
bin/kafka-topics.sh --create --topic maxwell_kafka --partitions 3 --replication-factor 2 --zookeeper node01:2181
# 创建kafka 消费者
bin/kafka-console-consumer.sh --topic maxwell_kafka --from-beginning --bootstrap-server node01:9092,node02:9092,node03:9092
# 启动maxwell
bin/maxwell
插入数据测试
-- 创建目标库
CREATE DATABASE /*!32312 IF NOT EXISTS*/`test` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `test`;
-- 创建表
CREATE TABLE `myuser` (
`id` int(12) NOT NULL,
`name` varchar(32) DEFAULT NULL,
`age` varchar(32) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 插入数据
insert into `myuser`(`id`,`name`,`age`) values (1,'zhangsan',NULL),(2,'xxx',NULL),(3,'ggg',NULL),(5,'xxxx',NULL),(8,'skldjlskdf',NULL),(10,'ggggg',NULL),(99,'ttttt',NULL),(114,NULL,NULL),(121,'xxx',NULL);
-- 岂容kafka消费者,会看到消费的数据
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic maxwell_kafka
以上是关于MaxWell(Mysql实时数据采集)的主要内容,如果未能解决你的问题,请参考以下文章
Canal或maxwell实时采集MySQL数据到Kafka
Maxwell 一款简单易上手的实时抓取Mysql数据的软件