2.Flink实时项目之Maxwell介绍
Posted 选手一号位
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2.Flink实时项目之Maxwell介绍相关的知识,希望对你有一定的参考价值。
1. Maxwell介绍
Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。 实时读取
MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、
RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址:http://maxwells-daemon.io/
2. MySQL 主从复制过程
-
Master 主库将改变记录,写到二进制日志(binary log)中
-
Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events
拷贝到它的中继日志(relay log);
-
Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
3. Maxwell 的工作原理
很简单,就是把自己伪装成 slave,假装从 master 复制数据
4. MySQL 的 binlog
4.1 什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
-
其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志
传递给 slaves 来达到 master-slave 数据一致的目的。
-
其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。
4.2 binlog 的开启
找到 MySQL 配置文件的位置: /etc/my.cnf
在 mysql 的配置文件下,修改配置
在[mysqld] 区块,设置/添加 log-bin=mysql-bin
这个表示 binlog 日志的前缀是 mysql-bin,以后生成的日志文件就是mysql-bin.123456 的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
4.3 binlog 的分类设置
mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。
在配置文件中可以选择配置
binlog_format= statement|mixed|row
-
statement
语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如
update tt set create_date=now() 如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
-
row
行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间。
-
mixed
statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题
默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。
综合上面对比,Maxwell 想做监控分析,选择 row 格式比较合适
5. Mysql的准备
5.1 创建实时业务数据库
数据库名:gmall2021
url:192.168.88.71:3306/gmall2021
username:root passwo:123456
导入数据:https://github.com/zhangbaohpu/gmall-mock/blob/master/data/gmall.sql
5.2 开启binlog
vim /etc/my.cnf
server-id= 1
log-bin=mysql-bin
binlog_format=row
#指定具体要同步的数据库
binlog-do-db=gmall2021
重启mysql服务,并查看初始化文件
service mysql restart
5.3 模拟生成数据
配置文件application.properties
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.88.71:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#业务日期
mock.date=2021-05-30
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=0
#生成新用户数量
mock.user.count=1000
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20
#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100
#购物车数量
mock.cart.count=300
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源 用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10
#用户下单比例
mock.order.user-rate=95
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=1000
#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10
#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50
#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
将mock-db模块打包成jar
https://github.com/zhangbaohpu/gmall-mock.git
及配置文件上传到hadoop101的/opt/software/applog目录下,并运行jar
java -jar mock-db-0.0.1-SNAPSHOT.jar
再次到到/var/lib/mysql 目录下,查看 index 文件的大小,已经增大
6. Maxwell安装
6.1 安装
-
下载解压
下载地址:http://maxwells-daemon.io/
tar -zxvf maxwell-1.25.0.tar.gz -C /opt/module/
-
初始化maxwell元数据
-
创建数据库
在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
mysql -uroot -p123456
mysql> create database maxwell;
-
设置安全级别
执行如果错误请看:https://www.cnblogs.com/blog0403/p/14790397.html
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
-
分配一个账号可以操作该数据库
mysql> GRANT ALL ON maxwell.* TO \'maxwell\'@\'%\' IDENTIFIED BY \'123456\';
库名:maxwell 账号:maxwell 密码:123456
-
分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@\'%\';
-
6.2 配置
拷贝并修改配置文件
cd /opt/module/maxwell-1.25.0
cp config.properties.example config.properties
vim config.properties
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
#添加内容
kafka_topic=ods_base_db_m
# mysql login info
#mysql所在机器
host=hadoop101
#maxwell采集用户
user=maxwell
password=123456
#同步历史数据需要用到
client_id=maxwell_1
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序,如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性,可选值producer_partition_by=database|table|primary_key|random| column
6.3 启动
在/home/zhangbao/bin创建启动脚本
vim maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
maxwell已启动
7. 监控binglog日志
maxwell服务已启动,此时只要监控的数据库有数据变动,maxwell就可以监控到变更的数据,然后发送到kafka的topic:ods_base_db_m。
7.1 启动kafka消费端
cd /opt/module/kafka/bin
./kafka-console-consumer.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic ods_base_db_m
7.2 启动向数据库导入数据
向数据库gmall2021导入数据项目:https://github.com/zhangbaohpu/gmall-mock,模块mock-db
打包后有个jar包:mock-db-0.0.1-SNAPSHOT.jar,配置文件application.properties如下:
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.88.71:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null
#业务日期
mock.date=2021-06-03
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=1
#生成新用户数量
mock.user.count=1000
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20
#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100
#购物车数量
mock.cart.count=300
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源 用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10
#用户下单比例
mock.order.user-rate=95
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=1000
#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10
#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50
#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
记住:jar包和配置文件在同一目录,方便修改配置
启动jar
cd /opt/software/applog
java -jar mock-db-0.0.1-SNAPSHOT.jar
7.3 kafka接收数据
在kafka消费端接收到的数据
"database": "gmall2021",
"table": "comment_info",
"type": "insert",
"ts": 1622733351,
"xid": 84599,
"xoffset": 16492,
"data":
"id": 1400471299563253795,
"user_id": 125,
"nick_name": null,
"head_img": null,
"sku_id": 16,
"spu_id": 4,
"order_id": 27460,
"appraise": "1204",
"comment_txt": "评论内容:24831521569529734613146941532892748384314624527997",
"create_time": "2021-06-03 23:15:51",
"operate_time": null
到此数仓的ods层已经完成,包括,日志数据及业务数据
以上是关于2.Flink实时项目之Maxwell介绍的主要内容,如果未能解决你的问题,请参考以下文章
Canal或maxwell实时采集MySQL数据到Kafka
Maxwell 一款简单易上手的实时抓取Mysql数据的软件
通过Maxwell实时增量抽取MySQL binlog并通过stdout展示