MaxWell(Mysql实时数据采集)

Posted 苍兰诺

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MaxWell(Mysql实时数据采集)相关的知识,希望对你有一定的参考价值。

mysql实时数据采集框架maxwell

基本介绍

maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

maxwell主要提供了下列功能:

  1. 支持 SELECT * FROM table 的方式进44行全量数据初始化
  2. 支持在主库发生failover后,自动恢复binlog位置(GTID)
  3. 可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
  4. 工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event

binlog介绍

binlog是mysql当中的二进制日志,主要用于记录对mysql数据库当中的数据发生或潜在发生更改的SQL语句,并以二进制的形式保存在磁盘中,如果后续我们需要配置主从数据库,如果我们需要从数据库同步主数据库的内容,我们就可以通过binlog来进行同步。说白了binlog可以用于解决实时同步mysql数据库当中的数据 binlog的格式也有三种:STATEMENT、ROW、MIXED 。

  • STATMENT模式:基于SQL语句的复制(statement-based replication, SBR),每一条会修改数据的sql语句会记录到binlog中。
    • 优点:不需要记录每一条SQL语句与每行的数据变化,这样子binlog的日志也会比较少,减少了磁盘IO,提高性能。
    • 缺点:在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及 user-defined functions(udf)等会出现问题)
  • 基于行的复制(row-based replication, RBR):不记录每一条SQL语句的上下文信息,仅需记录哪条数据被修改了,修改成了什么样子了。
    • 优点:不会出现某些特定情况下的存储过程或function、或trigger的调用和触发无法被正确复制的问题
    • 缺点:会产生大量的日志,尤其是alter table的时候会让日志暴涨。
  • 混合模式复制(mixed-based replication, MBR):以上两种模式的混合使用,一般的复制使 用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式。

因为statement只有sql,没有数据,无法获 取原始的变更日志,所以一般建议为ROW模式) mysql数据实时同步,我们可以通过解析mysql的bin-log的方式来实现,解析bin-log可以有多种方式,可以通过canal,或者max-well等各种方式实现。以下是各种抽取方式的对比介绍

开启binlog功能

第一步:添加普通用户maxwell

为mysql添加一个普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户, 进入mysql客户端,然后执行以下命令,进行授权

mysql -uroot  -p 
set global validate_password_policy=LOW;
set global validate_password_length=6;
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
flush privileges;

第二步:开启binlog机制

对于我们的mysql数据库,我们可以开启mysql的binlog功能,通过修改mysql的配置文件来实现开启,修改完配置文件之后,需要重启mysql服务

sudo vim /etc/my.cnf
# 添加或修改以下三行配置
log-bin= /var/lib/mysql/mysql-bin # 日志存放位置
binlog-format=ROW # binlog格式
server_id=1 # server id
# 重启mysql
sudo service mysqld restart

第三步:验证mysql服务

# 进入mysql客户端,并执行以下命令进行验证
mysql -uroot -p
mysql> show variables like '%log_bin%';
+---------------------------------+-----------------------------+
| Variable_name                   | Value                       |
+---------------------------------+-----------------------------+
| log_bin                         | ON                          |
| log_bin_basename                | /var/lib/mysql/binlog       |
| log_bin_index                   | /var/lib/mysql/binlog.index |
| log_bin_trust_function_creators | OFF                         |
| log_bin_use_v1_row_events       | OFF                         |
| sql_log_bin                     | ON                          |
+---------------------------------+-----------------------------+

root@0f3525c74115:/# cd /var/lib/mysql/
root@0f3525c74115:/var/lib/mysql# ls -la
-rw-r-----  1 mysql mysql      324 Jul  2 14:54  binlog.000007
-rw-r-----  1 mysql mysql      156 Jul  2 14:54  binlog.000008
-rw-r-----  1 mysql mysql       32 Jul  2 14:54  binlog.index
# 以上是binlog文件说明已经开启

安装maxwell实现实时采集mysql数据

第一步:下载maxwell

下载:https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz

第二步:修改配置

cp config.properties.example config.properties
vim config.properties
 #-----------------
producer=kafka
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
host=node03
user=maxwell
password=123456
producer=kafka
host=node03
port=3306
user=maxwell
password=123456
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
kafka_topic=maxwell_kafka
# 一定要注意:一定要保证我们使用maxwell用户和123456密码能够连接上mysql数据库

启动服务

  1. 启动我们的zookeeper服务
  2. 启动kafka服务并创建kafka的topic
  3. 启动maxwell服务,测试向数据库当中插入数据,并查看kafka当中是否能够同步到mysql数据
# 创建kafka topic 分区 3 副本 2
bin/kafka-topics.sh  --create --topic maxwell_kafka --partitions 3 --replication-factor 2 --zookeeper node01:2181

# 创建kafka 消费者 
bin/kafka-console-consumer.sh --topic maxwell_kafka --from-beginning --bootstrap-server node01:9092,node02:9092,node03:9092

# 启动maxwell
bin/maxwell

插入数据测试

-- 创建目标库
CREATE DATABASE /*!32312 IF NOT EXISTS*/`test` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `test`;

-- 创建表
CREATE TABLE `myuser` (
  `id` int(12NOT NULL,
  `name` varchar(32DEFAULT NULL,
  `age` varchar(32DEFAULT NULL,
  PRIMARY KEY (`id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 插入数据
insert  into `myuser`(`id`,`name`,`age`values (1,'zhangsan',NULL),(2,'xxx',NULL),(3,'ggg',NULL),(5,'xxxx',NULL),(8,'skldjlskdf',NULL),(10,'ggggg',NULL),(99,'ttttt',NULL),(114,NULL,NULL),(121,'xxx',NULL);

-- 岂容kafka消费者,会看到消费的数据
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic  maxwell_kafka


以上是关于MaxWell(Mysql实时数据采集)的主要内容,如果未能解决你的问题,请参考以下文章

Canal或maxwell实时采集MySQL数据到Kafka

Maxwell 一款简单易上手的实时抓取Mysql数据的软件

使用maxwell实时同步mysql数据到kafka

maxwell解析mysql的binlog数据并保存到kafka使用

利用Maxwell组件实时监听Mysql的binlog日志

2.Flink实时项目之Maxwell介绍