小记---------maxwell 一个可以实时读取mysql二进制日志binlog,并生成JSON格式的消息,作为生产者发送给kafka,Redis,文件或其他平台的应用程序
Posted 于二黑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小记---------maxwell 一个可以实时读取mysql二进制日志binlog,并生成JSON格式的消息,作为生产者发送给kafka,Redis,文件或其他平台的应用程序相关的知识,希望对你有一定的参考价值。
maxwell主要提供了下列功能
支持 SELECT * FROM table 的方式进行全量数据初始化
支持在主库发生failover后,自动回复binlog位置(GTID)
可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database,table,column等级别的数据分区
工作方式是伪装为Slave,接受binlog events, 然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
1.首先配置mysql 启用binlog (因为我们没有mysql设置的权限, 只需要让运维帮忙打开maxwell权限即可)
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
2.创建一个yzq.properties文件
log_level=INFO
producer=kafka
host = 120.27.208.185 //mysql数据库的URL
user = maxwell //就固定值
password = 123456 //密码
producer_ack_timeout = 600000
######### output format stuff ###############
output_binlog_position=ture
output_server_id=true
output_thread_id=ture
output_commit_info=true
output_row_query=true
output_ddl=false
output_nulls=true
output_xoffset=true
output_schema_id=true
############ kafka stuff #############
kafka.bootstrap.servers=df1:9092,df2:9092,df3:9092
kafka_topic=dfgo //kafka的topic 名称
kafka_partition_hash=murmur3
kafka_key_format=hash
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=all
producer_partition_by=primary_key
############## misc stuff ###########
bootstrapper=async
############## filter ###############
filter=exclude:*.*, include: och_test.order_info_201904,include: och_test.order_info_201905,include: och_test.order_info_201906,include: och_test.order_info_201907,include: och_test.order_info_201908,include: och_test.order_info_201906,include: och_test.order_info_201910,include: och_test.order_info_201911,include: och_test.order_info_201912,include: och_test.renter_info,include: och_test.driver_info
3.执行----- bin/maxwell --config yzq.properties
即可
以上是关于小记---------maxwell 一个可以实时读取mysql二进制日志binlog,并生成JSON格式的消息,作为生产者发送给kafka,Redis,文件或其他平台的应用程序的主要内容,如果未能解决你的问题,请参考以下文章
Canal或maxwell实时采集MySQL数据到Kafka
Maxwell 一款简单易上手的实时抓取Mysql数据的软件