2.Flink实时项目之Maxwell介绍

Posted 选手一号位

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2.Flink实时项目之Maxwell介绍相关的知识,希望对你有一定的参考价值。

1. Maxwell介绍

Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。 实时读取

MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、

RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。

官网地址:http://maxwells-daemon.io/

2. MySQL 主从复制过程

  • Master 主库将改变记录,写到二进制日志(binary log)中

  • Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events

拷贝到它的中继日志(relay log);

  • Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

3. Maxwell 的工作原理

很简单,就是把自己伪装成 slave,假装从 master 复制数据

4. MySQL 的 binlog

4.1 什么是 binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

  • 其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志

    传递给 slaves 来达到 master-slave 数据一致的目的。

  • 其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。

4.2 binlog 的开启

找到 MySQL 配置文件的位置: /etc/my.cnf

在 mysql 的配置文件下,修改配置

在[mysqld] 区块,设置/添加 log-bin=mysql-bin

这个表示 binlog 日志的前缀是 mysql-bin,以后生成的日志文件就是mysql-bin.123456 的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

4.3 binlog 的分类设置

mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。

在配置文件中可以选择配置

binlog_format= statement|mixed|row

  • statement

    语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如

    update tt set create_date=now() 如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

    优点: 节省空间

    缺点: 有可能造成数据不一致。

  • row

    行级, binlog 会记录每次操作后每行记录的变化。

    优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。

    缺点:占用较大空间。

  • mixed

    statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题

    默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理

    优点:节省空间,同时兼顾了一定的一致性。

    缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。

综合上面对比,Maxwell 想做监控分析,选择 row 格式比较合适

5. Mysql的准备

5.1 创建实时业务数据库

数据库名:gmall2021

url:192.168.88.71:3306/gmall2021

username:root passwo:123456

导入数据:https://github.com/zhangbaohpu/gmall-mock/blob/master/data/gmall.sql

5.2 开启binlog

vim /etc/my.cnf

server-id= 1
log-bin=mysql-bin
binlog_format=row
#指定具体要同步的数据库
binlog-do-db=gmall2021

重启mysql服务,并查看初始化文件

service mysql restart

5.3 模拟生成数据

配置文件application.properties

logging.level.root=info
​
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.88.71:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
​
logging.pattern.console=%m%n
​
mybatis-plus.global-config.db-config.field-strategy=not_null
​
#业务日期
mock.date=2021-05-30
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=0
​
#生成新用户数量
mock.user.count=1000
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20
​
#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100
​
#购物车数量
mock.cart.count=300
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源  用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10
​
#用户下单比例
mock.order.user-rate=95
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=1000
​
#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10
​
​
#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50
​
#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
​

将mock-db模块打包成jar

https://github.com/zhangbaohpu/gmall-mock.git

及配置文件上传到hadoop101的/opt/software/applog目录下,并运行jar

java -jar mock-db-0.0.1-SNAPSHOT.jar

再次到到/var/lib/mysql 目录下,查看 index 文件的大小,已经增大

6. Maxwell安装

6.1 安装

  • 下载解压

    下载地址:http://maxwells-daemon.io/

    tar -zxvf maxwell-1.25.0.tar.gz -C /opt/module/

  • 初始化maxwell元数据

    • 创建数据库

      在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据

      mysql -uroot -p123456

      mysql> create database maxwell;

    • 设置安全级别

      执行如果错误请看:https://www.cnblogs.com/blog0403/p/14790397.html

      mysql> set global validate_password_length=4;

      mysql> set global validate_password_policy=0;

    • 分配一个账号可以操作该数据库

      mysql> GRANT ALL ON maxwell.* TO \'maxwell\'@\'%\' IDENTIFIED BY \'123456\';

      库名:maxwell 账号:maxwell 密码:123456

    • 分配这个账号可以监控其他数据库的权限

      mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@\'%\';

6.2 配置

拷贝并修改配置文件

cd /opt/module/maxwell-1.25.0

cp config.properties.example config.properties

vim config.properties

# tl;dr config
log_level=info
​
producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
#添加内容
kafka_topic=ods_base_db_m
# mysql login info
#mysql所在机器
host=hadoop101
#maxwell采集用户
user=maxwell
password=123456
#同步历史数据需要用到
client_id=maxwell_1

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序,如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性,可选值producer_partition_by=database|table|primary_key|random| column

6.3 启动

在/home/zhangbao/bin创建启动脚本

vim maxwell.sh

/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

maxwell已启动

7. 监控binglog日志

maxwell服务已启动,此时只要监控的数据库有数据变动,maxwell就可以监控到变更的数据,然后发送到kafka的topic:ods_base_db_m。

7.1 启动kafka消费端

cd /opt/module/kafka/bin

./kafka-console-consumer.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic ods_base_db_m

7.2 启动向数据库导入数据

向数据库gmall2021导入数据项目:https://github.com/zhangbaohpu/gmall-mock,模块mock-db

打包后有个jar包:mock-db-0.0.1-SNAPSHOT.jar,配置文件application.properties如下:

logging.level.root=info
​
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.88.71:3306/gmall2021?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
​
logging.pattern.console=%m%n
​
mybatis-plus.global-config.db-config.field-strategy=not_null
​
#业务日期
mock.date=2021-06-03
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=1
​
#生成新用户数量
mock.user.count=1000
#男性比例
mock.user.male-rate=20
#用户数据变化概率
mock.user.update-rate:20
​
#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100
​
#购物车数量
mock.cart.count=300
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3
#购物车来源  用户查询,商品推广,智能推荐, 促销活动
mock.cart.source-type-rate=60:20:10:10
​
#用户下单比例
mock.order.user-rate=95
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=1000
​
#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10
​
#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50
​
#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5

记住:jar包和配置文件在同一目录,方便修改配置

启动jar

cd /opt/software/applog

java -jar mock-db-0.0.1-SNAPSHOT.jar

7.3 kafka接收数据

在kafka消费端接收到的数据


  "database": "gmall2021",
  "table": "comment_info",
  "type": "insert",
  "ts": 1622733351,
  "xid": 84599,
  "xoffset": 16492,
  "data": 
    "id": 1400471299563253795,
    "user_id": 125,
    "nick_name": null,
    "head_img": null,
    "sku_id": 16,
    "spu_id": 4,
    "order_id": 27460,
    "appraise": "1204",
    "comment_txt": "评论内容:24831521569529734613146941532892748384314624527997",
    "create_time": "2021-06-03 23:15:51",
    "operate_time": null
  

到此数仓的ods层已经完成,包括,日志数据及业务数据

以上是关于2.Flink实时项目之Maxwell介绍的主要内容,如果未能解决你的问题,请参考以下文章

MaxWell(Mysql实时数据采集)

Canal或maxwell实时采集MySQL数据到Kafka

Maxwell 一款简单易上手的实时抓取Mysql数据的软件

通过Maxwell实时增量抽取MySQL binlog并通过stdout展示

Canal或maxwell实时采集MySQL数据到Kafka

使用maxwell实时同步mysql数据到kafka