canal +RocketMQ实现MySQL与ElasticSearch数据同步
Posted 请叫我东子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal +RocketMQ实现MySQL与ElasticSearch数据同步相关的知识,希望对你有一定的参考价值。
1.引言
在很多业务情况下,我们都会在系统中引入ElasticSearch
搜索引擎作为做全文检索的优化方案。
如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch
的代码。
这种数据同步的代码跟业务代码耦合性非常高,并且使得代码的可读性降低,于是乎,我们能不能把这些数据同步的代码抽出来形成一个独立的模块呢?肯定是可以的。
下面我会以一个CMS文章管理
为例来演示canal
+RocketMQ
实现mysql
与ElasticSearch
数据同步。
2.技术栈
如果你还对SpringBoot
、canal
、RocketMQ
、MySQL
、ElasticSearch
不是很了解的话,这里我为大家整理个它们的官网网站,如下
-
SpringBoot:https://spring.io/projects/spring-boot
-
RocketMQ:http://rocketmq.apache.org/
-
MySQL:https://www.mysql.com/
-
ElasticSearch:https://www.elastic.co/cn/elasticsearch/
这里主要介绍一下canal,其他的自行学习。
2.1 canal定义
canal
[kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.。
2.2 canal工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
2.3 架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
到这里我们对canal
有了一个初步的认识,接下我们就进入实战环节。
3.环境准备
3.1 MySQL 配置
对于自建 MySQL
, 需要先开启 Binlog
写入功能,配置binlog-format
为ROW
模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
**注意:**针对阿里云 RDS for MySQL
, 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权canal
连接 MySQL 账号具有作为 MySQL slave
的权限, 如果已有账户可直接 使用grant 命令授权。
#创建用户名和密码都为canal
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3.2 canal的安装和配置
3.2.1 canal.admin安装和配置
canal提供web ui 进行Server管理、Instance管理。
3.2.1.1 下载 canal.admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
3.2.1.2 解压完成可以看到如下结构:
我们先配置canal.admin之后。通过web ui来配置 cancal server,这样使用界面操作非常的方便。
3.2.1.3 配置修改
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://$spring.datasource.address/$spring.datasource.database?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
3.2.1.4 初始化元数据库
初始化元数据库
mysql -h127.1 -uroot -p
# 导入初始化SQL
> source conf/canal_manager.sql
-
初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
-
canal_manager.sql默认会在conf目录下,也可以通过链接下载
canal_manager.sql
3.2.1.5 启动
sh bin/startup.sh
3.2.1.6 启动成功,使用浏览器输入http://ip:8089/ 会跳转到登录界面
使用用户名:admin 密码为:123456 登录
登录成功,会自动跳转到如下界面。这时候我们的canal.admin就搭建成功了。
3.2.2 下载 canal.deployer, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解压完成可以看到如下结构:
进入conf 目录。可以看到如下的配置文件。
我们先对canal.properties
不做任何修改。
使用canal_local.properties
的配置覆盖canal.properties
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
使用如下命令启动canal server
sh bin/startup.sh local
启动成功。同时我们在canal.admin web ui中刷新 server 管理,可以到canal server 已经启动成功。
这时候我们的canal.server 搭建已经成功。
3.2.3 在canal admin ui 中配置Instance管理
3.2.3.1 新建 Instance
选择Instance 管理-> 新建Instance
填写 Instance名称:cms_article
选择 选择所属主机集群
选择 载入模板
修改默认信息
#mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#改成自己的数据库信息(需要监听的数据库)
canal.instance.defaultDatabaseName = cms-manage
canal.instance.connectionCharset = UTF-8
#table regex 需要过滤的表 这里数据库的中所有表
canal.instance.filter.regex = .\\*\\\\..\\*
# MQ 配置 日志数据会发送到cms_article这个topic上
canal.mq.topic=cms_article
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\\\..*,.*\\\\..*
#单分区处理消息
canal.mq.partition=0
我们这里为了演示之创建一张表。
配置好之后,我需要点击保存。此时在Instances 管理中就可以看到此时的实例信息。
3.2.4 修改canal server 的配置文件,选择消息队列处理binlog
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
- kafka: https://github.com/apache/kafka
- RocketMQ : https://github.com/apache/rocketmq
本案例以RocketMQ
为例
我们仍然使用web ui 界面操作。点击 server 管理 - > 点击配置
修改配置文件
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = RocketMQ
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 192.168.0.200:9078
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
修改好之后保存。会自动重启。
此时我们就可以在rocketmq的控制台看到一个cms_article topic已经自动创建了。
3.2.5 配置ElasticSearch启动
这里我使用的ElasticSearch 6.6.1
es 启动成功了。
我们使用 elasticsearch-head 连接是可以看到节点信息。一会我们就使用 elasticsearch-head 查询es中数据。
4.代码实战
4.1 创建一个springboot 项目
项目结构如下:
4.2 pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rockmq-samples</artifactId>
<groupId>com.lidong.rocketmq</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-canal-rocketmq-es</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.2.5.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>$spring-boot.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
<configuration>
<mainClass>com.lidong.RocketmqSyncSamplesApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- spring-boot-starter-data-elasticsearch:操作es依赖库
- rocketmq-spring-boot-starter:操作rocketmq依赖库
其他就不过多介绍了。大家一看就明白了。
4.3 application的配置
server:
port: 8085
rocketmq:
name-server: localhost:9876
spring:
data:
elasticsearch:
cluster-nodes: localhost:9300
cluster-name: my-application
repositories:
enabled: true
- rocketmq.name-server : rocketmq的namesver
- spring.data.elasticsearch.cluster-nodes :es 节点地址
- spring.data.elasticsearch.cluster-name: es节点集群名称
- spring.data.elasticsearch.repositories.enabled :开启es仓库使用
4.4 创建es操作的实体类和仓库类
4.4.1 EsCmsArticle实体类
package com.lidong.canal.es.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.io.Serializable;
import java.util.Date;
/**
* 文章详情
*
* String indexName();//索引库的名称,个人建议以项目的名称命名
* String type() default "";//类型,个人建议以实体的名称命名
* short shards() default 5;//默认分区数
* short replicas() default 1;//每个分区默认的备份数
* String refreshInterval() default "1s";//刷新间隔
* String indexStoreType() default "fs";//索引文件存储类型
*
**/
@Document(indexName = "canal-rocketmq-es", type = "cms-article")
public class EsCmsArticle implements Serializable
@Id
private Long courseId;
/** 标题 */
private String title;
/** 摘要 */
private String abstractX;
/** 内容 */
private String content;
/** 年龄段 */
private String ageRange;
/** 图片 */
private String image;
/** 查看次数 */
private Long viewNumber;
/** 作者 */
private String author;
/** 来源 */
private String source;
/** 所属分类 */
private Long classId;
/** 关键字 */
private String keyWords;
/** 描述 */
private String description;
/** 文章url */
private String url;
/**
* 文章状态
*/
private Integer status;
/**
* 创建时间
*/
private Date createTime;
/**
* 修改时间
*/
private Date updateTime;
public void setCourseId(Long courseId)
this.courseId = courseId;
public Long getCourseId()
return courseId;
public void setTitle(String title)
this.title = title;
public String getTitle()
return title;
public void setAbstractX(String abstractX)
this.abstractX = abstractX;
public String getAbstractX()
return abstractX;
public void setContent(String content)
this.content = content;
public String getContent()
return content;
public void setAgeRange(String ageRange)
this.ageRange = ageRange;
public String getAgeRange()
return ageRange;
public void setImage(String image)
this.image = image;
public String getImage()
return image;
public void setViewNumber(Long viewNumber)
this.viewNumber = viewNumber;
public Long getViewNumber()
return viewNumber;
public void setAuthor(String author)
this.author = author;
public String getAuthor()
return author;
public void setSource(String source)
this.source = source;
public String getSource()
return source;
public void setClassId(Long classId)
this.classId = classId;
public Long getClassId()
return classId;
public void setKeyWords(String keyWords)
this.keyWords = keyWords;
public String getKeyWords()
return keyWords;
public void setDescription(String description)
this.description = description;
public String getDescription()
return description;
public void setUrl(String url)
this.url = url;
public String getUrl()
return url;
public Integer getStatus()
return status;
public void setStatus(Integer status)
this.status = status;
public Date getCreateTime()
return createTime;
public void setCreateTime(Date createTime)
this.createTime = createTime;
public Date getUpdateTime()
return updateTime;
public void setUpdateTime(Date updateTime)
this.updateTime = updateTime;
@Override
public String toString()
return "CmsArticle" +
"courseId=" + courseId +
", title='" + title + '\\'' +
", abstractX='" + abstractX + '\\'' +
", content='" + content + '\\'' +
", ageRange='" + ageRange + '\\'' +
", image='" + image + '\\'' +
", viewNumber=" + viewNumber +
", author='" + author + '\\'' +
", source='" + source + '\\'' +
", classId=" + classId +
", keyWords='" + keyWords + '\\'' +
", description='" + description + '\\'' +
", url='" + url + '\\'' +
", status=" + status +
", createTime=" + createTime +
", updateTime=" + updateTime +
'';
4.4.2 CmsArticleRepository 仓库类
package com.lidong.canal.rocketmq;
import com.alibaba.fastjson.JSON;
import com.lidong.canal.bean.CanalBean;
import com.lidong.canal.bean.CmsArticle;
import com.lidong.canal.es.entity.EsCmsArticle;
import com.lidong.canal.es.repository.CmsArticleRepository;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
@RocketMQMessageListener(
topic = "cms_article"以上是关于canal +RocketMQ实现MySQL与ElasticSearch数据同步的主要内容,如果未能解决你的问题,请参考以下文章
Canal利用canal实现mysql实时增量备份并对接kafka
docker部署canal 1.1.6 rocketmq 分区顺序性