mysql+canal+kafka做系统日志收集
Posted 散人三水昜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mysql+canal+kafka做系统日志收集相关的知识,希望对你有一定的参考价值。
这里只介绍部署,简介原理参见开源介绍。
一、简介
canal作为阿里巴巴 mysql binlog 增量订阅&消费组件,可以模拟成mysql的备份库slave,然后从mysql数据库获得binlog日志进行解析,再传给消息中间件(本文介绍的是kafka)进行二次处理。
二、环境
操作系统:centos7.6(这个无所谓,linux系统就行,查看命令 cat /proc/version)
java版本:jdk1.8
canal版本:最好下载最新的安装包
mysql版本:5.7.22(目前canal对mysql版本有要求,不知道后面会不会放开)
kafka版本:kafka_2.12-2.1.0.tgz
三、部署kafka
kafka下载:从官网下载 kafka_2.12-2.1.0.tgz,
1.解压在虚拟机(服务器)上
tar -zxvf kafka_2.12-2.1.0.tgz
2.进入到kafka的解压目录,启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
3.打开另一个终端,启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties &
4.打开另一个终端,创建消费者(即canal中配置的实例中的topic):example,这一步应该在canal server启动好后做
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
四、安装配置mysql
具体安装步骤不做说明,主要要开启binlog写入功能,并且配置binlog模式为row
1.打开配置文件
vi /etc/my.cnf
2.添加下面这段:
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
3.保存后,重启mysql
service mysqld restart
4. 创建用户canal,并赋予mysql slave的权限,目的是让mysql以为canal是一个slave
先登录mysql,然后
--创建用户canal,密码canal
CREATE USER canal IDENTIFIED BY 'canal';
--赋权所有库表
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--赋所有权,最好也执行一下,不然canal是没有建库的权限的
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
--刷新
FLUSH PRIVILEGES;
五、部署canal server
1.下载canal
本次下载的最新包名为canal.deployer-1.1.4.tar.gz
官方连接:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
2.上传canal.deployer-1.1.4.tar.gz到指定目录并解压,这个看个人习惯,我通常是放到/data下的
mkdir -p /data/canal
tar -zxvf canal.deployer-1.1.4.tar.gz
3.修改instance 配置文件 vi conf/example/instance.properties
# 按需修改成自己的数据库信息
################################################
...
canal.instance.master.address=127.0.0.1:3306
username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
mq config
这个topic就是发到kafka的
canal.mq.topic=example
针对库名或者表名发送动态topic
canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
hash partition config
canal.mq.partitionsNum=3
库名.表名: 唯一主键,多个表之间用逗号分隔
canal.mq.partitionHash=mytest.person:id,mytest.role:id
################################################
4.修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象 使用文本格式(JSON)进行传输,否则Kafka里扔进去的是二进制数据,虽然不影响,但是看起来不方便
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
5.启动canal
cd /data/canal
sh bin/startup.sh
六、测试
1.创建测试库、表、增删改修,就是在数据库中做一些数据变更的操作
2.kafka消费端:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --from-beginning
{"data":null,"database":"test","es":1589362351000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"alter table canal_test modify column id int unsigned not Null auto_increment primary key","sqlType":null,"table":"canal_test","ts":1589362465210,"type":"ALTER"}
{"data":[{"id":"10","name":"kkkkkkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362545000,"id":10,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362545945,"type":"INSERT"}
{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362599000,"id":15,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":[{"name":"kkkkkkkkkkkkk"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362599943,"type":"UPDATE"}
{"data":[{"id":"10","name":"kkkkkkkkk","create_time":"2020-05-13 17:35:45","update_time":"2020-05-13 17:35:45"}],"database":"test","es":1589362639000,"id":18,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"create_time":93,"update_time":93},"table":"canal_test","ts":1589362639115,"type":"DELETE"}
七、集成springboot
1.pom引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
spring for kafka对应版本
2.配置文件
server:
port: 8081
spring:
application:
name: producer
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: Springboot-groupid
auto-offset-reset: earliest
enable-auto-commit: true
bootstrap-servers: 192.168.76.128:9092 #这里是kafka的地址
listeners=PLAINTEXT://192.168.76.128:9092
3.编写一个消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* Description: 消费者
*
* @author atang
* @version V1.0
* @date 2021-06-16
*/
public class Consumer {
public void listener(ConsumerRecord record){
Optional msg = Optional.ofNullable(record.value());
if (msg.isPresent()){
System.out.println(msg.get());
}
}
}
4.启动项目,这时候修改数据库控制台会打印结果。可以对接收到数据进行二次整理,然后保存到数据库作为操作日志,这里就不详细写了。
以上是关于mysql+canal+kafka做系统日志收集的主要内容,如果未能解决你的问题,请参考以下文章
Canal利用canal实现mysql实时增量备份并对接kafka
flinksql从kafka中消费mysql的binlog日志