97 基于Binlog实现MySQL与Redis数据一致性问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了97 基于Binlog实现MySQL与Redis数据一致性问题相关的知识,希望对你有一定的参考价值。
参考技术A mysql 与Redis 数据一致性问题 直接将Redis清空中间件 canal框架 基于 docker环境构建
canal 框架 原理:
<u>https://gitee.com/mirrors/canal?utm_source=alading&utm_campaign=repo</u>
canal 框架原理
1,canal伪装成mysql从节点 订阅mysql 主节点的binlog文件
2,当我们的mysql 主节点 binlog 文件发生了变化,则将binlog 文件发送给canal服务器端
3,canal 服务器端将该binlog 文件二进制转换成json格式给canal客户端
4,canal客户端在将改数据同步到Redis/ES
基于Binlog 开启方式
1.mysql 开启binlog 文件配置
windows 配置
查询 my.ini配置文件位置
C:\ProgramData\MySQL\MySQL Server 5.7
2, linux mysql
安装canal
进入容器
编辑配置文件
重启canal
Docker-compose 构建canal
canal.instance.mysql.slaveId:slaveId不能与mysql的serverId一样
canal.instance.master.address:mysql地址
canal.instance.dbUsername:mysql账号
canal.instance.dbPassword:mysql密码
Canal+Kafka实现MySql与Redis数据一致性
在生产环境中,经常会遇到MySql与Redis数据不一致的问题。那么如何能够保证MySql与Redis数据一致性的问题呢?话不多说,咱们直接上解决方案。
如果对Canal还不太了解的可以先去看一下官方文档:https://github.com/alibaba/canal
首先,咱们得先开启MySql的允许基于BinLog文件主从复制。因为Canal的核心原理也是相当于把自己当成MySql的一个从节点,然后去订阅主节点的BinLog日志。
开启BinLog文件配置
1. 配置MySQL的 my.ini/my.cnf 开启允许基于binlog文件主从同步
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
配置该文件后,重启mysql服务器即可
show variables like ‘log_bin‘;//查询MySql是否开启了log_bin.没有开启log_bin的值是OFF,开启之后是ON
2. 添加cannl的账号或者直接使用自己的root账号。添加完后一定要检查mysql user 权限为y(SELECT* from `user` where user=‘canal‘)
drop user ‘canal‘@‘%‘; CREATE USER ‘canal‘@‘%‘ IDENTIFIED BY ‘canal‘; grant all privileges on *.* to ‘canal‘@‘%‘ identified by ‘canal‘; flush privileges;
整合Kafka
1. 由于Kafka依赖Zookeeper,先安装zookeeper
zoo_sample.cfg 修改为 zoo.cfg
修改 zoo.cfg 中的 dataDir=E:zkkafkazookeeper-3.4.14data
新增环境变量:
ZOOKEEPER_HOME: E:zkkafkazookeeper-3.4.14 (zookeeper目录)
Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%in;"
运行zk zkServer.cmd。
针对闪退,可按照以下步骤进行解决(参考:https://blog.csdn.net/pangdongh/article/details/90208230):
1 、编辑zkServer.cmd文件末尾添加pause
。这样运行出错就不会退出,会提示错误信息,方便找到原因。
2.如果报错内容为:-Dzookeeper.log.dir=xxx"‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件的解决。则建议修改zkServer.cmd文件:
@echo off REM Licensed to the Apache Software Foundation (ASF) under one or more REM contributor license agreements. See the NOTICE file distributed with REM this work for additional information regarding copyright ownership. REM The ASF licenses this file to You under the Apache License, Version 2.0 REM (the "License"); you may not use this file except in compliance with REM the License. You may obtain a copy of the License at REM REM http://www.apache.org/licenses/LICENSE-2.0 REM REM Unless required by applicable law or agreed to in writing, software REM distributed under the License is distributed on an "AS IS" BASIS, REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. REM See the License for the specific language governing permissions and REM limitations under the License. setlocal call "%~dp0zkEnv.cmd" set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain echo on java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* endlocal pause
2. 安装kafka
解压 kafka_2.13-2.4.0 改名为 kafka
修改 server.properties中的配置
log.dirs=E:zkkafkakafkalogs
启动Kafka:
Cmd 进入到该目录:
cd E:zkkafkakafka
.inwindowskafka-server-start.bat .configserver.properties
如果启动报系统找不到指定的路径,进入kafka目录kafkainwindowskafka-run-class.bat,将set JAVA="%JAVA_HOME%/bin/java"改为java环境安装的绝对路径
例如:set JAVA="D:LIJDKjdk1.8.0_152injava"
Canal配置更改
1.修改 example/instance.properties
canal.mq.topic=maikt-topic
2.修改 canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
3.启动startup.bat 查看 logsexample example.log日志文件是否有 start successful....
SpringBoot项目整合kafka
maven依赖
<!-- springBoot集成kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
application.yml
# kafka spring: kafka: # kafka服务器地址(可以多个) bootstrap-servers: 127.0.0.1:9092 consumer: # 指定一个默认的组名 group-id: kafka2 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 redis: host: 1.1.1.1 # password: port: 6379 database: 10 password: 123456
Redis工具类
@Component public class RedisUtils { /** * 获取我们的redis模版 */ @Autowired private StringRedisTemplate stringRedisTemplate; public void setString(String key, String value) { setString(key, value, null); } public void setString(String key, String value, Long timeOut) { stringRedisTemplate.opsForValue().set(key, value); if (timeOut != null) { stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS); } } public String getString(String key) { return stringRedisTemplate.opsForValue().get(key); } /** * redis当成数据库中 * <p> * 注意事项:对我们的redis的key设置一个有效期 */ public boolean deleteKey(String key) { return stringRedisTemplate.delete(key); } }
Kafka主题监听方法(往redis同步数据的代码可以根据自己的需求去完善,本代码只是做测试用)
@KafkaListener(topics = "maikt-topic") public void receive(ConsumerRecord<?, ?> consumer) { System.out.println("topic名称:" + consumer.topic() + ",key:" + consumer.key() + "," + "分区位置:" + consumer.partition() + ", 下标" + consumer.offset() + "," + consumer.value()); String json = (String) consumer.value(); JSONObject jsonObject = JSONObject.parseObject(json); String sqlType = jsonObject.getString("type"); JSONArray data = jsonObject.getJSONArray("data"); if(data!=null) { JSONObject userObject = data.getJSONObject(0); String id = userObject.getString("id"); String database = jsonObject.getString("database"); String table = jsonObject.getString("table"); String key = database + "_" + table + "_" + id; if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) { redisUtils.setString(key, userObject.toJSONString()); return; } if ("DELETE".equals(sqlType)) { redisUtils.deleteKey(key); } } }
第一次写文章,如果有不足的地方,欢迎各位大佬指正。
来源于:蚂蚁课堂
以上是关于97 基于Binlog实现MySQL与Redis数据一致性问题的主要内容,如果未能解决你的问题,请参考以下文章
docker环境安装mysqlcanalelasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中