2021-10-24 基于Docker结合Canal实现MySQL实时增量数据传输功能
Posted 愚公搬代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021-10-24 基于Docker结合Canal实现MySQL实时增量数据传输功能相关的知识,希望对你有一定的参考价值。
基于Docker结合Canal实现mysql实时增量数据传输功能
主要介绍了基于Docker结合Canal实现MySQL实时增量数据传输功能,本文给图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
Canal的介绍
Canal的历史由来
在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了Canal,主要是基于trigger(触发器)的方式获取增量变更。从
2010年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。
当前的Canal支持的数据源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。
Canal的应用场景
目前普遍基于日志增量订阅和消费的业务,主要包括:
- 基于数据库增量日志解析,提供增量数据订阅和消费
- 数据库镜像 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务Cache刷新
- 带业务逻辑的增量数据处理
- Canal的工作原理
在介绍Canal的原理之前,我们先来了解下MySQL主从复制的原理。
MySQL主从复制原理
- MySQL Master将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log
events,可以通过show binlog events命令进行查看 - MySQL Slave会将Master的binary log中的binary log events拷贝到它的中继日志relay log
MySQL Slave重读并执行relay log中的事件,将数据变更映射到它自己的数据库表中 - 了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?
Canal工作原理
-
Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
-
MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal) Canal解析binary
-
log对象(数据为byte流)
基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现MySQL实时增量数据传输的功能。
既然Canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。
Canal的Docker环境准备
因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解Canal,所以
关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。 如果你想和更多容器技术专家交流,可以加我微信liyingjiese,备注『加群』。群里每周都有全球各大公司的最佳实践以及
行业最新动态 。
什么是Docker
相信绝大多数人都使用过虚拟机VMware,在使用VMware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一
遍,而且VMware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。
为了便于大家快速理解Docker,便与VMware做对比来做介绍,Docker提供了一个开始,打包,运行APP的平台,把APP(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概
念就是镜像(类似VMware的系统镜像)与容器(类似VMware里安装的系统)。
什么是Image(镜像)
- 文件和meta data的集合(root filesystem)
- 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
- 不同的image可以共享相同的layer
- Image本身是read-only的
什么是Container(容器)
- 通过Image创建(copy)
- 在Image layer之上建立一个container layer(可读写)
- 类比面向对象:类和实例
- Image负责APP的存储和分发,Container负责运行APP
Docker的网络介绍
Docker的网络类型有三种: - Bridge:桥接网络。默认情况下启动的Docker容器,都是使用Bridge,Docker安装时创建的桥接网络,每次Docker容器重启时,会按照顺序获取对应的IP地址,这个就导致重启下,Docker的IP地
址就变了。 - None:无指定网络。使用 --network=none,Docker容器就不会分配局域网的IP。
- Host:主机网络。使用–network=host,此时,Docker容器的网络会附属在主机上,两者是互通的。例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器
中。
创建自定义网络:(设置固定IP)
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的网络类型docker network ls:
搭建Canal环境
附上Docker的下载安装地址==> Docker Download 。
下载Canal镜像docker pull canal/canal-server:
下载MySQL镜像docker pull mysql,下载过的则如下图:
查看已经下载好的镜像docker images:
接下来通过镜像生成MySQL容器与canal-server容器:
生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
命令介绍
–net mynetwork #使用自定义网络
–ip #指定分配ip
查看Docker中运行的容器docker ps:
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让Canal伪装成Salve并正确获取MySQL中的binary log呢?
对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式,通过修改MySQL配置文件来开启bin_log,使用find / -name my.cnf查找my.cnf,修改文件内容如下:
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复
进入MySQL容器docker exec -it mysql bash。
创建链接MySQL的账号Canal并授予作为MySQL slave的权限,如果已有账户可直接GRANT:
mysql -uroot -proot
创建账号
CREATE USER canal IDENTIFIED BY ‘canal’;
授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
刷新并应用
FLUSH PRIVILEGES;
数据库重启后,简单测试 my.cnf 配置是否生效:
show variables like ‘log_bin’;
show variables like ‘log_bin’;
show master status;
canal-server的配置修改
进入canal-server容器docker exec -it canal-server bash。
编辑canal-server的配置vi canal-server/conf/example/instance.properties:
更多配置请参考==>Canal配置说明 。
重启canal-server容器docker restart canal-server 进入容器查看启动日志:
docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
至此,我们的环境工作准备完成!
拉取数据并同步保存到ElasticSearch
本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令:
下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取Canal解析后的binlog数据。首先我们基于Spring Boot搭建一个canal demo应用。结构如下图所示:
Student.java
package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
// @Data 用户生产getter、setter方法 @Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
}
CanalConfig.java
package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author haha
*/@Configuration
public class CanalConfig {
// @Value 获取 application.properties配置中端内容 @Value("${canal.server.ip}")
private String canalIp; @Value("${canal.server.port}")
private Integer canalPort; @Value("${canal.destination}")
private String destination; @Value("${elasticSearch.server.ip}")
private String elasticSearchIp; @Value("${elasticSearch.server.port}")
private Integer elasticSearchPort; @Value("${zookeeper.server.ip}")
private String zkServerIp;
// 获取简单canal-server连接 @Bean
public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
return canalConnector;
}
// 通过连接zookeeper获取canal-server连接 @Bean
public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
return canalConnector;
}
// elasticsearch 7.x客户端 @Bean
public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
);
return client;
}
}
CanalDataParser.java
由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从GitHub上获取:
public static class TwoTuple<A, B> {
public final A eventType;
public final B columnMap;
public TwoTuple(A a, B b) {
eventType = a;
columnMap = b;
}
}
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
for (Entry entry : entrys) {
// binlog event的事件事件
long executeTime = entry.getHeader().getExecuteTime();
// 当前应用获取到该binlog锁延迟的时间
long delayTime = System.currentTimeMillis() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 当前的entry(binary log event)的条目类型属于事务
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务头信息,执行的线程id,事务耗时
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()),
simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务提交信息,事务id
logger.info("----------------\\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
}
continue;
}
// 当前entry(binary log event)的条目类型属于原始数据
if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null;
try {
// 获取储存的内容
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 获取当前内容的事件类型
EventType eventType = rowChage.getEventType();
logger.info(row_format,
new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
// 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
printXAInfo(rowChage.getPropsList());
// 循环当前内容条目的具体数据
for (RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> columns;
// 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}HashMap<String, Object> map = new HashMap<>(16);
// 循环把列的name与value放入map中
for (Column column: columns){ map.put(column.getName(), column.getValue());
}
rows.add(new TwoTuple<>(eventType, map));
}
}
}
return rows;
}
ElasticUtils.java
package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
/**
* @author haha
*/@Slf4j @Component
public class ElasticUtils { @Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 新增
* @param student
* @param index 索引
*/
public void saveEs(Student student, String index) {
IndexRequest indexRequest = new IndexRequest(index)
.id(student.getId())
.source(JSON.toJSONString(student), XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("保存数据至ElasticSearch成功:{}", response.getId());
} catch (IOException e) {
log.error("保存数据至elasticSearch失败: {}", e);
}
}
/**
* 查看
* @param index 索引
* @param id _id
* @throws IOException
*/
public void getEs(String index, String id) throws IOException { GetRequest getRequest = new GetRequest(index, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource();
for (Map.Entry<String, Object> entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
/**
* 更新
* @param student
* @param index 索引
* @throws IOException
*/
public void updateEs(Student student, String index) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("更新数据至ElasticSearch成功:{}", response.getId());
}
/**
* 根据id删除数据
* @param index 索引
* @param id _id
* @throws IOException
*/
public void DeleteEs(String index, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("删除数据至ElasticSearch成功:{}", response.getId());
}
}
BinLogElasticSearch.java
package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
以上是关于2021-10-24 基于Docker结合Canal实现MySQL实时增量数据传输功能的主要内容,如果未能解决你的问题,请参考以下文章