基于Canal的Mysql&Redis数据同步实现
Posted ZNineSun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Canal的Mysql&Redis数据同步实现相关的知识,希望对你有一定的参考价值。
文章目录
1.Canal简介
我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。
我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。这就需要借助canal来实现mysql到redis的数据同步
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)
其工作原理主要是模仿MySQL复制:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
Canal架构如下:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块如下图所示:
大致的解析过程如下:
- parse解析MySQL的Bin log,然后将数据放入到sink中
- sink对数据进行过滤,加工,分发
- store从sink中读取解析好的数据存储起来
- 然后自己用设计代码将store中的数据同步写入Redis中就可以了
- 其中parse/sink是框架封装好的,我们做的是store的数据读取那一步
canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin。
因为目前canal还不能直接通过配置就实现对redis的数据同步,因此我们需要自定义一下canal客户端,通过服务端将数据同步到客户端后,由客户端自定义操作同步到redis
2.Canal安装
canal支持多种安装方式,本文提供了windows环境下和docker环境下的安装方式,详情请移步:《Canal安装教程》
在此基础上,本文相关配置如下:
- mysql
- 端口号:3307
- ip:192.168.0.104
- 待匹配表:canal_demo/tb_user
SQL:
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for tb_user
-- ----------------------------
DROP TABLE IF EXISTS `tb_user`;
CREATE TABLE `tb_user` (
`user_id` int NOT NULL AUTO_INCREMENT,
`user_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`age` int DEFAULT NULL,
`sex` char(2) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb3 COLLATE=utf8_bin;
对应的instance.properties如下:
- canal
在canal-server目录下新建一个redis目录,复制examlpe的所有内容到redis中-
windows环境下
-
docker环境下
-
重新启动canal-server服务即可
进入 canal-admin 的控制台,如果你的配置正确,server 列表里会自动出现启动:
在 instance 管理中新建一个 instance,注意名字要和刚才复制的文件夹名字对应:
过几秒钟以后,如果你的配置正确,instance 列表里,如果列表里的状态是停止,可以在操作里手动启动
3.SpringBoot集成Canal实现数据同步
为了节约文章篇幅,本文只贴出核心代码,完整代码可在文末进行查看或下载
3.1 添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
由于我们需要对数据库操作来观察Redis中的变化,建议大家集成一下Mybatis或Mybatis-Plus实际操作一下:
3.2 添加配置
#redis配置
# Redis服务器地址
spring.redis.host=192.168.0.104
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# Redis数据库索引(默认为0)
spring.redis.database=0
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=0
spring.redis.lettuce.shutdown-timeout=0
3.3 添加Redis操作的工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
@Service
public class RedisUtils
@Autowired
private RedisTemplate redisTemplate;
/**
* 写入缓存 * @param key * @param value * @return
*/
public boolean set(final String key, Object value)
boolean result = false;
try
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
result = true;
catch (Exception e)
e.printStackTrace();
return result;
/**
* 写入缓存设置时效时间 * @param key * @param value * @return
*/
public boolean setEx(final String key, Object value, Long expireTime)
boolean result = false;
try
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
result = true;
catch (Exception e)
e.printStackTrace();
return result;
/**
* 判断缓存中是否有对应的value * @param key * @return
*/
public boolean exists(final String key)
return redisTemplate.hasKey(key);
/**
* 读取缓存 * @param key * @return
*/
public Object get(final String key)
Object result = null;
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
result = operations.get(key);
return result;
/**
* 删除对应的value * @param key
*/
public boolean remove(final String key)
if (exists(key))
Boolean delete = redisTemplate.delete(key);
return delete;
return false;
3.4 新建 canal 客户端
新建一个canal 客户端,并且依赖 ApplicationRunner,在 Spring 容器启动完成后开启守护线程同步任务(注意 import 时选择 canal 包下的类):
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.demo.util.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Slf4j
@Component
public class CanalClient implements ApplicationRunner
@Autowired
RedisUtils redisUtils;
private static final String TABLE_NAME = "tb_user";
private static final String PRIMARY_KEY = "user_id";
private static final String SEPARATOR = ":";
private static final String CANAL_SERVER_HOST = "192.168.0.104";
private static final int CANAL_SERVER_PORT = 11111;
private static final String CANAL_INSTANCE = "redis";
private static final String USERNAME = "canal";
private static final String PASSWORD = "canal";
@Override
public void run(ApplicationArguments args) throws Exception
this.initCanal();
public void initCanal()
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(CANAL_SERVER_HOST, CANAL_SERVER_PORT),
CANAL_INSTANCE, USERNAME, PASSWORD);
int batchSize = 1000;
try
log.info("启动 canal 数据同步...");
connector.connect();
connector.subscribe(".*\\\\..*");
connector.rollback();
while (true)
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0)
try
// 时间间隔1000毫秒
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
else
syncEntry(message.getEntries());
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
finally
connector.disconnect();
private void syncEntry(List<CanalEntry.Entry> entrys)
for (CanalEntry.Entry entry : entrys)
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)
continue;
CanalEntry.RowChange rowChange;
try
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
catch (Exception e)
throw new RuntimeException("ERROR data:" + entry.toString(), e);
CanalEntry.EventType eventType = rowChange.getEventType();
log.info("================> binlog[:] , name[,] , eventType : ",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
String tableName = entry.getHeader().getTableName();
if (!TABLE_NAME.equalsIgnoreCase(tableName)) continue;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList())
if (eventType == CanalEntry.EventType.INSERT)
printColumn(rowData.getAfterColumnsList());
redisInsert(tableName, rowData.getAfterColumnsList());
else if (eventType == CanalEntry.EventType.UPDATE)
printColumn(rowData.getAfterColumnsList());
redisUpdate(tableName, rowData.getAfterColumnsList());
else if (eventType == CanalEntry.EventType.DELETE)
printColumn(rowData.getBeforeColumnsList());
redisDelete(tableName, rowData.getBeforeColumnsList());
private void redisInsert(String tableName, List<CanalEntry.Column> columns)
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns)
json.put(column.getName(), column.getValue());
for (CanalEntry.Column column : columns)
if (PRIMARY_KEY.equalsIgnoreCase(column.getName()))
String key = tableName + SEPARATOR + column.getValue();
redisUtils.set(key, json.toJSONString());
log.info("redis数据同步新增,key:" + key);
break;
private void redisUpdate(String tableName, List<CanalEntry.Column>使用canal解决Mysql和Redis数据同步(TCP)
前言
之前写过一篇文章《使用canal解决Mysql和Redis数据同步问题》,也是使用canal实现mysql和redis的数据同步,和该篇文章不一样的是,上一篇是基于MQ实现数据同步,该篇文章是基于TCP方式来实现。
工作原理分析
我们在面试的时候常常听面试官问这么一个问题:你们的Mysql和Redis怎么做数据同步的,根据不同的业务场景又很多方案,你可能会说先写库再删缓存,或者延迟双删或其他方案。今天我要给大家分享的就是比较成熟的方案-使用Canal实现Mysql和Redis数据的同步。
我不知道你是否了解Mysql主从,根据2/8原则,80%的性能问题都在读上面,当我们数据库的读并发较大的时候,我们可以使用Mysql主从来分担读的压力。它的原理是所有的写操作在主库上,读操作在从库上,当然主库也可以承担读请求,而从库的数据则通过主库复制而来,Mysql自带主从复制的功能。如下图
主从复制步骤:
- 将Master的binary-log日志文件打开,mysql会把所有的DDL,DML,TCL写入BinaryLog日志文件中
- Master会生成一个 log dump 线程,用来给从库的 i/o线程传binlog
- 从库的i/o线程去请求主库的binlog,并将得到的binlog日志写到中继日志(relaylog)中
- 从库的sql线程,会读取relaylog文件中的日志,并解析成具体操作,通过主从的操作一致,而达到最终数据一致
而Canal的原理就是伪装成Slave从Binlog中复制SQL语句或者数据。
Mysql和Redis数据同步方案
根据上面所说,我们就可以通过Canal去自动同步数据库的binlog数据日志文件,然后再把数据同步到Redis,从而达到Mysql和Redis自动同步的功能。很遗憾的是Canal没办法直接把数据库同步到Redis,它支持的是组件有 : mysql、Kafka、ElasticSearch、Hbase、RocketMQ等
当然 canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑
- canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
- canal c# 客户端: https://github.com/dotnetcore/CanalSharp
- canal go客户端: https://github.com/CanalClient/canal-go
- canal Python客户端: https://github.com/haozi3156666/canal-python
canal 作为 MySQL binlog 增量获取和解析工具,可将数据通过TCP协议将数据同步到canal-client也就是我们的应用中,因此我们可以使用下面这种方案来同步数据
- 首选需要开启Mysql的bin-log
- 然后需要安装canal-server伪装成slave同步mysql中的数据
- 编写canal-client客户端监听canal-server,把数据从canal-server中同步过来
- 然后把拿到的数据写入Redis即可
开启Mysql bin-log日志
找到Mysql安装目录中的my.ini 配置文件,我以mysql 5.5为例,在 mysqld 下做如下配置
[mysqld]
#开启bInlog
log-bin=mysql-bin
#给mysql服务指定一个唯一的ID
server-id=1
#以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据
binlog-format=ROW
#同步的数据库名
#binlog-do-db=canaldb
#忽略的表
binlog-ignore-db=mysql
# 启动mysql时不启动grant-tables授权表
skip-grant-tables
修改好之后,重启Mysql服务。注意:我这里指定了需要同步的数据库为canaldb,所以需要创建一个数据库,同时创建了一个employee表作为演示
然后创建一个用户提供给canal来链接Mysql做数据同步
flush privileges;
#创建用户cannal
CREATE USER canal IDENTIFIED BY 'canal';
#把所有权限赋予canal,密码也是canal
GRANT ALL PRIVILEGES ON canaldb.user TO 'canal'@'%' identified by "canal";
//GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' identified by "canal";
#刷新权限
flush privileges;
到这,Mysql部分就搞定了
安装Canal
去官网下载 Canal : https://github.com/alibaba/canal/releases ,我使用的是canal.deployer-1.1.5.tar.gz版本
下载好之后解压,目录结构如下
接下来修改instance 配置文件 : conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
#我的端口是3307
canal.instance.master.address=192.168.1.20:3307
# username/password,数据库的用户名和密码
...
#刚才开通的mysql的账户密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# 同步的表的规则
# table regex
# 同步所有表
#canal.instance.filter.regex=.*\\\\..*
# 同步多个表,用逗号隔开
canal.instance.filter.regex=canaldb.employee,canaldb.dept
#################################################
...省略...
这里注意如下几个东西,其他的不用管
- master.address :Mysql的地址,我的端口是3307,默认是3306
- dbUsername :上面开通的Mysql用户
- dbPassword : 密码
- ccanal.instance.filter.regex : 要同步的表,多个表用逗号隔开
接着修改canal 配置文件 conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
# 这里使用tcp , 还支持kafka和rocketmq
canal.serverMode = tcp
...省略...
这里需要注意 : canal.serverMode = tcp: 我这里以tcp为例,指的是以tcp协议把数据同步数据,而不是同步到mq
配置好之后,找到 canal 安装目录下 bin目录下的 startup.bat 双击启动,linux上启动:startup.sh
编写canal-client
接下来我们需要在项目中整合canal-client来同步canal-server中的数据,然后写入Redis
第一步:导入如下依赖,我这里使用了 canal-spring-boot-starter
来整合canal-client
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--Canal 依赖-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.50</version>
</dependency>
</dependencies>
第二步:在yaml配置canal地址,以及Redis相关参数
canal:
server: 127.0.0.1:11111 #canal的地址
destination: example #默认的数据同步的目的地
spring:
redis:
host: 127.0.0.1
password: 123456
编写启动类
@SpringBootApplication
public class CanalApplication
public static void main(String[] args)
SpringApplication.run(CanalApplication.class,args);
第三步:对Redis做配置,实现自动序列化
//缓存的配置
@Configuration
public class RedisConfig
@Resource
private RedisConnectionFactory factory;
//使用JSON进行序列化
@Bean
public RedisTemplate<Object, Object> redisTemplate()
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//JSON格式序列化
GenericFastJsonRedisSerializer serializer = new GenericFastJsonRedisSerializer();
//key的序列化
redisTemplate.setKeySerializer(serializer);
//value的序列化
redisTemplate.setValueSerializer(serializer);
//hash结构key的虚拟化
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
//hash结构value的虚拟化
redisTemplate.setHashValueSerializer(serializer);
return redisTemplate;
第四步:编写实体类,对应要同步的数据库的表
@Data
public class Employee
private Long id;
private String username;
第五步:编写数据同步处理器,canal-client提供了EntryHandler,该handler中提供了insert,delete,update方法,当监听到某张表的相关操作后,会回调对应的方法把数据传递进来,我们就可以拿到数据往Redis同步了。
@CanalTable("employee")
@Component
@Slf4j
public class EmployeeHandler implements EntryHandler<Employee>
//把数据往Redis同步
@Autowired
private RedisTemplate<Object,Object> redisTemplate;
@Override
public void insert(Employee employee)
redisTemplate.opsForValue().set("EMP:"+employee.getId(),employee);
@Override
public void delete(Employee employee)
redisTemplate.delete("EMP:"+employee.getId());
@Override
public void update(Employee before, Employee after)
redisTemplate.opsForValue().set("EMP:"+after.getId(),after);
- @CanalTable(“employee”) :监听的表
EntryHandler<Employee>
: 拿到employee表的改变后的数据之后,会封装为Employee实体 投递给我们
到这里代码就编写完成了,启动程序可以从控制台看到canal-client在不同尝试获取数据
启动redis后, 尝试手动修改数据库 employee表中的数据,然后实例redis-cli 查看 数据,下面是表中的数据
下面是redis中的数据
好了文章就到这里把,喜欢的话请给个好评,一不小心来个一键三连就更好啦!!!
以上是关于基于Canal的Mysql&Redis数据同步实现的主要内容,如果未能解决你的问题,请参考以下文章
转载:使用canal让redis中的数据与mysql数据库中的保持同步