mysql+canal+kafka做系统日志收集

Posted 散人三水昜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mysql+canal+kafka做系统日志收集相关的知识,希望对你有一定的参考价值。

这里只介绍部署,简介原理参见开源介绍。

一、简介

canal作为阿里巴巴 mysql binlog 增量订阅&消费组件,可以模拟成mysql的备份库slave,然后从mysql数据库获得binlog日志进行解析,再传给消息中间件(本文介绍的是kafka)进行二次处理。

二、环境

操作系统:centos7.6(这个无所谓,linux系统就行,查看命令  cat /proc/version)

java版本:jdk1.8

canal版本:最好下载最新的安装包

mysql版本:5.7.22(目前canal对mysql版本有要求,不知道后面会不会放开)

kafka版本:kafka_2.12-2.1.0.tgz

三、部署kafka

kafka下载:从官网下载 kafka_2.12-2.1.0.tgz

1.解压在虚拟机(服务器)上

tar -zxvf kafka_2.12-2.1.0.tgz


2.进入到kafka的解压目录,启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties &


3.打开另一个终端,启动kafka:

bin/kafka-server-start.sh -daemon config/server.properties &


4.打开另一个终端,创建消费者(即canal中配置的实例中的topic):example,这一步应该在canal server启动好后做

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning


四、安装配置mysql

具体安装步骤不做说明,主要要开启binlog写入功能,并且配置binlog模式为row

1.打开配置文件

vi /etc/my.cnf


2.添加下面这段:

[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

3.保存后,重启mysql

service mysqld restart


4. 创建用户canal,并赋予mysql slave的权限,目的是让mysql以为canal是一个slave

先登录mysql,然后

--创建用户canal,密码canal

CREATE USER canal IDENTIFIED BY 'canal'; 

--赋权所有库表

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

--赋所有权,最好也执行一下,不然canal是没有建库的权限的

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

--刷新

FLUSH PRIVILEGES;


五、部署canal server

1.下载canal

本次下载的最新包名为canal.deployer-1.1.4.tar.gz

官方连接:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart


2.上传canal.deployer-1.1.4.tar.gz到指定目录并解压,这个看个人习惯,我通常是放到/data下的

mkdir -p /data/canaltar -zxvf canal.deployer-1.1.4.tar.gz


3.修改instance 配置文件 vi conf/example/instance.properties

#  按需修改成自己的数据库信息

#################################################...canal.instance.master.address=127.0.0.1:3306# username/password,数据库的用户名和密码...canal.instance.dbUsername = canalcanal.instance.dbPassword = canal...# mq config# 这个topic就是发到kafka的canal.mq.topic=example# 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#################################################


4.修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# ...# 可选项: tcp(默认), kafka, RocketMQcanal.serverMode = kafka# ...# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:9092canal.mq.retries = 0# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576# flatMessage模式下请将该值改大, 建议50-200canal.mq.lingerMs = 1canal.mq.bufferMemory = 33554432# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为flat json格式对象 使用文本格式(JSON)进行传输,否则Kafka里扔进去的是二进制数据,虽然不影响,但是看起来不方便canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all# kafka消息投递是否使用事务canal.mq.transaction = false



5.启动canal

cd /data/canalsh bin/startup.sh


六、测试

1.创建测试库、表、增删改修,就是在数据库中做一些数据变更的操作

2.kafka消费端:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning


{"data":null,"database":"test","es":1589362351000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"alter table canal_test modify column id int unsigned not Null auto_increment primary key","sqlType":null,"table":"canal_test","ts":1589362465210,"type":"ALTER"}{"data":[{"id":"10","name":"kkkkkkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362545000,"id":10,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362545945,"type":"INSERT"}{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362599000,"id":15,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":[{"name":"kkkkkkkkkkkkk"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362599943,"type":"UPDATE"}{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362639000,"id":18,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362639115,"type":"DELETE"}



七、集成springboot

1.pom引入

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version></dependency><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.0.RELEASE</version></dependency>

spring for kafka对应版本


2.配置文件


server: port: 8081spring: application: name: producer kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: Springboot-groupid auto-offset-reset: earliest enable-auto-commit: true    bootstrap-servers: 192.168.76.128:9092 #这里是kafka的地址
listeners=PLAINTEXT://192.168.76.128:9092 


3.编写一个消费者


import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;
/** * Description: 消费者 * * @author atang * @version V1.0 * @date 2021-06-16 */@Componentpublic class Consumer {
@KafkaListener(topics = {"example"}) public void listener(ConsumerRecord record){ Optional msg = Optional.ofNullable(record.value()); if (msg.isPresent()){ System.out.println(msg.get()); } }
}


4.启动项目,这时候修改数据库控制台会打印结果。可以对接收到数据进行二次整理,然后保存到数据库作为操作日志,这里就不详细写了。

以上是关于mysql+canal+kafka做系统日志收集的主要内容,如果未能解决你的问题,请参考以下文章

canal-随记001-吐血一个下午找bug

canal+mysql+kafka安装配置

Canal利用canal实现mysql实时增量备份并对接kafka

flinksql从kafka中消费mysql的binlog日志

canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

3.canal解析binlog送入kafka