mysql进阶:canal实现mysql数据同步到redis|实现自定义canal客户端
Posted wu@55555
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mysql进阶:canal实现mysql数据同步到redis|实现自定义canal客户端相关的知识,希望对你有一定的参考价值。
0. 引言
我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。
我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。
这就需要借助canal来实现mysql到redis的数据同步
1. canal简介
canal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等
canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin
canal的数据同步流程如下图所示
因为目前canal还不能直接通过配置就实现对redis的数据同步,因此我们需要自定义一下canal客户端,通过服务端将数据同步到客户端后,由客户端自定义操作同步到redis
2. 安装
2.1 安装jdk
canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。
canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError
,详细报错信息如下:
java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer;
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:412) ~[na:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:397) ~[na:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:155) ~[na:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:116) ~[na:na]
at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:63) ~[na:na]
at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:185) ~[client-adapter.launcher-1.1.6.jar:na]
2.2 安装canal
1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。
这里因为我们要自定义客户端,所以只用下载服务端deployer即可
当然也可以通过wget指令直接下载到服务器
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章
通过canal来实现mysql数据同步到elasticsearch
3. 实现同步
3.1 mysql操作步骤
1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog
修改mysql配置文件
vim /etc/my.cnf
修改内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
2、源数据库创建一个canal账号,并且设置slave
,dump
权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、因为mysql8.0.3后身份检验方式为caching_sha2_password
,但canal使用的是mysql_native_password
,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;
4、创建一个canal_manager
数据库,编码格式utf8mb4
(如果未安装admin管理服务则不需要该数据库)
该数据库用于远程统一配置管理
导入脚本canal_manager.sql
,初始化数据库结构数据
该脚本文件在canal.admin
下的conf目录中
3.2 服务端deployer操作
1、查询源mysql服务器的binlog位置
# 源mysql服务器中登陆mysql执行
show binary logs;
2、进入deployer安装目录
cd deployer
3、我们新建一个实例redis
专门用于本次演示
cd conf
# 复制example实例配置
cp -R example redis
4、修改实例redis配置文件instance.properties
cd redis
vim instance.properties
修改内容
# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=binlog.000007
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000
# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\\..*
mysql数据同步起点说明:
- canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
- canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
- 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
5、启动服务端
./bin/start.sh
6、查看示例日志,无报错则说明启动成功
cat logs/redis/redis.log
针对服务端的详细配置项解释,可以参考官方文档:
3.3 客户端client操作
1、创建springboot项目,引入依赖spring-data-redis
,fastjson
,lombok
,canal.client
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
这里需要注意的是官方提供的canal-client依赖如下所示,而上述我们引入的是封装过的第三方包,更加易用。但该canal-spring-boot-starter
依赖包目前已经停止维护了,最新版对应的canal-client还是1.2.1-RELEASE
版本的。不过不影响我们使用,如果有需要可以下载源码二次开发。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>$canal.version</version>
</dependency>
2、修改配置文件application.yml
# 应用名称
spring:
application:
name: canal_client_redis
redis:
host: 127.0.0.1
password:
# 应用服务 WEB 访问端口
server:
port: 8080
# canal服务端地址
canal:
server: 192.168.244.22:11111
# 实例名,与deployer中配置的保持统一
destination: redis
# 设置canal消息日志打印级别
logging:
level:
top.javatool.canal.client: warn
3、因为我们同步的是数据对象,需要进行json序列化,于是配置redis序列化方式为json,创建如下配置类
/**
* @author benjamin_5
* @Description
* @date 2022/9/25
*/
@Configuration
@AllArgsConstructor
public class RedisConfig
private RedisConnectionFactory factory;
/**
* 设置json序列化
* @return
*/
@Bean
public RedisTemplate<Object,Object> redisTemplate()
RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
// json序列化
GenericFastJsonRedisSerializer serializer = new GenericFastJsonRedisSerializer();
// key设置json序列化
redisTemplate.setKeySerializer(serializer);
// value设置json序列化
redisTemplate.setValueSerializer(serializer);
// hash结构key设置json序列化
redisTemplate.setHashKeySerializer(serializer);
// hash结构value设置json序列化
redisTemplate.setHashValueSerializer(serializer);
return redisTemplate;
4、创建实体类,与同步的表格对应
这里需要注意的是,因为mysql中的字段以下划线命名法,实体类中以驼峰命名法,直接同步的话会导致字段名不匹配,于是我们需要通过JPA
注解来映射字段名
/**
* @author benjamin_5
* @Description
* @date 2022/9/25
*/
@Data
@Table(name = "bs_ecif")
public class BsEcif implements Serializable
@Column(name="seq_no")
private Long seqNo;
@Column(name="customer")
private String customer;
@Column(name="id_type")
private String idType;
@Column(name="id_no")
private String idNo;
@Column(name="new_field")
private String newField;
@Override
public String toString()
return "BsEcif" +
"seqNo=" + seqNo +
", customer='" + customer + '\\'' +
", idType='" + idType + '\\'' +
", idNo='" + idNo + '\\'' +
", newField='" + newField + '\\'' +
'';
5、canal-spring-boot-starter
包提供了EntryHandler
类用于监控表数据更新,于是我们创建一个EntryHandler
实现类,用于实现redis的增删改
/**
* @author benjamin_5
* @Description
* @date 2022/9/25
*/
@CanalTable("bs_ecif")
@Component
@AllArgsConstructor
@Slf4j
public class BsEcifHandler implements EntryHandler<BsEcif>
private final RedisTemplate<Object,Object> redisTemplate;
@Override
public void insert(BsEcif bsEcif)
log.info("[新增]"+bsEcif.toString());
redisTemplate.opsForValue().set("bs_ecif:"+bsEcif.getSeqNo(),bsEcif);
@Override
public void update(BsEcif before, BsEcif after)
log.info("[更新]"+after.toString());
redisTemplate.opsForValue().set("bs_ecif:"+after.getSeqNo(),after);
@Override
public void delete(BsEcif bsEcif)
log.info("[删除]"+bsEcif.getSeqNo());
redisTemplate.delete("bs_ecif:"+bsEcif.getSeqNo());
最后提供两个官方示例给大家参考,官方示例采用的是canal-client
包,未经过封装
3.4 测试
1、启动项目
2、数据库中添加一行数据
3、查看日志打印
4、连接redis,发现新增数据同步成功
5、修改数据
6、查看redis,同步成功
7、删除数据
8、查看redis,发现数据也同步删除了
至此我们所有类型的操作都测试完成
演示代码下载
本次演示代码,可在如下地址下载,供大家参考:
总结
自此我们针对mysql同步到redis的演示就结束了,除了配置实现外,也带大家体验了自定义客户端的实现,以更加易用的第三方封装canal-client包来快速实现数据同步。想要加深理解,真正掌握还需要大家亲自动手操作试试。
我正在申请认证,如果本文对你有帮助的话,希望点个赞支持一下
实战 | canal 实现Mysql到Elasticsearch实时增量同步
题记
关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区、QQ群等讨论最多的问题之一。 问题包含但不限于: 1、Mysql如何同步到Elasticsearch? 2、Logstash、kafka_connector、canal选型有什么不同,如何取舍? 3、能实现同步增删改查吗? ..... 本文给出答案。
1、Canal同步
1.1 canal官方已支持Mysql同步ES6.X
同步原理,参见之前: 干货 | Debezium实现Mysql到Elasticsearch高效实时同步。
canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能。canal adapter 的 Elastic Search 版本支持6.x.x以上。 需要借助adapter实现。
1.2 同步效果
1)已验证:仅支持增量同步,不支持全量已有数据同步。这点,canal的初衷订位就是“阿里巴巴mysql数据库binlog的增量订阅&消费组件”。
2)已验证:由于采用了binlog机制,Mysql中的新增、更新、删除操作,对应的Elasticsearch都能实时新增、更新、删除。
3)推荐使用场景 canal适用于对于Mysql和Elasticsearch数据实时增、删、改要求高的业务场景。 实时场景要求不高的业务场景,logstashinputjdbc也能满足。
建议,做好选型甄别。
2、同步版本:
- ES:6.6.1
- Mysql: 5.7.25
- canal:v1.1.3-alpha-2
- canal-adapter:v1.1.3-alpha-2
- canal下载地址:https://github.com/alibaba/canal/releases
3、同步步骤解读
3.1 启动canal,可作为常驻进程后台运行。
官网已有详细描述https://github.com/alibaba/canal/wiki/QuickStart, 以下仅列举关键注意事项。
对应下载文件:canal.deployer-1.1.3-SNAPSHOT.tar.gz, 可以实时关注最新版本。
3.1.1 启用binlog
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
3.1.2 修改配置文件
vi conf/example/instance.properties
配置数据库基本信息。
3.1.3 启动canal
bin/startup.sh可通过日志排查错误。
3.2 配置ElasticSearch适配器,并实现同步。
官网已有详细描述:https://github.com/alibaba/canal/wiki/Sync-ES。 以下仅针对部署遇到的坑做描述。
3.2.1 部署版本
anal.adapter-1.1.3-SNAPSHOT.tar.gz,如有更新,建议使用最新版本。
3.2.2 核心配置
[root@localhost es]# cat mytest_user.yml
dataSourceKey: defaultDS
destination: example
esMapping:
_index: baidu_index
_type: _doc
_id: _id
pk: id
sql: "select a.id as _id, a.title, a.url, a.publish_time, a.content,
from baidu_info as a"
# objFields:
# _labels: array:;
etlCondition: "where a.id >= 1"
commitBatch: 3000
实现目的:库表id字段作为Elasticsearch的_id,以期实现自增。
4、多表关联实现
建议参考官网:https://github.com/alibaba/canal/wiki/Sync-ES 支持:
- 一对一
- 一对多
- 多对多
5、坑
坑1:canal.adapter-1.1.2 启动失败
启动失败:https://github.com/alibaba/canal/issues/1513 该问题在1.1.3版本已经修复。
坑2:不支持全量同步
全量同步建议使用logstash或者其他工具:
坑3:必须先在ES创建好对应索引的Mapping
否则,会没有识别索引,会报写入错误。
坑4:多张表的同步如何实现?
在canal.adapter-1.1.3/conf/es的新增*.yml配置即可。 也就是说,可以一张Mysql表一个配置文件。
坑5:空指针异常错误
解决方案:sql语句部分,指定对应库表id为ES中的_id,否则会报错。 举例:
select sx_sid as _id, name from baidu_info
坑6:基于 row 模式的 binlog 会不会记录变更前、变更后的值呢?
- INSERT:只有变更后的值。
- UPDATE:包含了变更前、变更后的值。
- DELETE:变更前的值
关于全量同步:https://github.com/alibaba/canal/issues/376
6 同步选型小结
以上不同选型各有利弊,建议 结合实际业务斟酌选择。 欢迎留下你的同步实践方案和思考。
加入星球,和大佬一起沉淀技术!
以上是关于mysql进阶:canal实现mysql数据同步到redis|实现自定义canal客户端的主要内容,如果未能解决你的问题,请参考以下文章
canal+kafka实现mysql与redis数据同步(centos7)
docker环境安装mysqlcanalelasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中