springboot整合canal,监听MySQL binlog日志,实现增量同步
Posted 符华-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合canal,监听MySQL binlog日志,实现增量同步相关的知识,希望对你有一定的参考价值。
有两个数据库,并不是主从关系,但是需要同步某张表,可以通过binlog日志,进行同步,前提是这两个数据库的要同步的表,表名和字段名需要一致。
当前项目连接的数据库(需要同步的数据库):base_project
需要将数据同步到 base_project 的数据库(需要监听的数据库):test
一、下载canal
我整合的是1.1.4版本,所以下载也是下载的1.1.4版本
解压,打开 conf/example/instance.properties 文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# 要监听的数据库ip地址和端口号,ip地址用真实ip,不要用localhost或127.0.0.1
canal.instance.master.address=192.168.0.111:3306
# binlog的名称,canalv1.1.5不需要设置日志名称和偏移量,canal会自动识别
canal.instance.master.journal.name=binlog.000189
# 偏移量
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password,MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=test.customer,test.fault
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\\\..*,.*\\\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\\\..*
#################################################
然后再打开 conf 下的 canal.properties 文件
# 找到下面这一句代码,这句代码默认是注释的,放开注释。如果不放开注释,可能在程序中就接收不到数据库的操作消息
# 如果放开了还是接收不到,可以试着把值调大一点
canal.instance.tsdb.snapshot.interval = 16
二、数据库配置
1、开启binlog
我的MySQL是8.0以上版本的,binlog是默认开启的,如果不知道是否开启的话,执行以下sql,value是ON说明是开启了,OFF是关闭状态,需要开启。
show variables like 'log_bin';
2、创建用户并授权
#创建用户canal,密码为canal,主机地址为192.168.0.111
create user canal@192.168.0.111 identified by 'canal';
#SHOW VIEW 查看视图,SELECT 查询,REPLICATION SLAVE、REPLICATION CLIENT 复制,*.* 表示所有库
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@192.168.0.111;
# 刷新
flush privileges;
3、设置binlog的模式为ROW
# 查看binlog的模式,如果value不是ROW,需要设置成ROW
show variables like 'binlog_format';
# 设置ROW
SET SESSION binlog_format = 'ROW';
三、启动canal
1、bin目录下,双击 startup.sh 启动
2、logs/canal 目录下,查看 canal.log,这个样子说明启动成功
3、logs/example 目录下,查看 example.log,这个样子没有报错就没问题
如果有 caching_sha2_password Auth failed 异常,则修改canal用户对应的身份验证插件为 mysql_native_password
java.net.ConnectException: Failed to connect to localhost/127.0.0.1 异常,需要将canal用户的主机localhost或127.0.0.1改为本机ip地址,配置文件的也要改。按照我的配置应该不会出现这个异常。
四、整合canal
1、pom.xml
<!-- 整合canal,监听数据库binlog日志,实现增量同步 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
<!-- 去掉guava依赖,否则启动报错 -->
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.7</version>
</dependency>
2、yml
spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/base_project?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/*/*.xml
type-aliases-package: com.entity.sys,;com.common.base
global-config:
db-config:
id-type: auto
field-strategy: NOT_EMPTY
db-type: MYSQL
configuration:
map-underscore-to-camel-case: true
call-setters-on-nulls: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
canal-monitor-mysql:
#要监听的数据库的主机地址,用具体的ip地址,不要用localhost或127.0.0.1
hostname: "192.168.0.111"
#canal端口号,这个是固定的:11111
port: 11111
#这个也是固定的
example: "example"
#要监听的数据库名和表名,这里我只监听用户表和部门表;指定多个表用逗号隔开
#如果是监听数据库的全部表,用:test\\\\..*
tableName: test.sys_user,test.sys_dept
3、CanalUtil
import cn.hutool.core.lang.Console;
import cn.hutool.core.util.StrUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mapper.pwjk.SqlMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author Admin
* 监听数据库binlog日志,实现监听的数据库和当前数据库增删改同步
*/
@Component
public class CanalUtil
/**
* 要监听的数据库的主机地址
*/
@Value("$canal-monitor-mysql.hostname")
private String canalMonitorHost;
/**
* canal端口号,这个是固定的用:11111
*/
@Value("$canal-monitor-mysql.port")
private Integer canalMonitorPort;
/**
* canal的example,这个值是固定的用:example
*/
@Value("$canal-monitor-mysql.example")
private String canalExample;
/**
* 要监听的数据库名和表名
*/
@Value("$canal-monitor-mysql.tableName")
private String canalMonitorTableName;
@Resource
private SqlMapper sqlMapper;
/**
* canal入库方法
*/
public void run()
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost,canalMonitorPort), canalExample, "", "");
int batchSize = 1000;
try
connector.connect();
Console.log("数据库检测连接成功:" + canalMonitorTableName);
connector.subscribe(canalMonitorTableName);
connector.rollback();
try
while (true)
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0)
//每隔一秒监听一次
Thread.sleep(1000);
else
dataHandle(message.getEntries());
connector.ack(batchId);
catch (InterruptedException e)
e.printStackTrace();
catch (InvalidProtocolBufferException e)
e.printStackTrace();
finally
connector.disconnect();
/**
* 数据处理
*/
private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException
try
for (CanalEntry.Entry entry : entrys)
if (CanalEntry.EntryType.ROWDATA == entry.getEntryType())
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.DELETE)
//删除,返回删除的sql语句,然后在mapper中,直接执行这句sql
String sql = saveDeleteSql(entry);
if (StrUtil.isNotBlank(sql))
sqlMapper.dynamicsDelete(sql);
else if (eventType == CanalEntry.EventType.UPDATE)
//更新,返回更新的sql语句,然后在mapper中,直接执行这句sql
String sql = saveUpdateSql(entry);
if (StrUtil.isNotBlank(sql))
sqlMapper.dynamicsUpdate(sql);
else if (eventType == CanalEntry.EventType.INSERT)
//新增,返回新增的sql语句,然后在mapper中,直接执行这句sql
String sql = saveInsertSql(entry);
if (StrUtil.isNotBlank(sql))
sqlMapper.dynamicsInsert(sql);
catch (Exception e)
return;
/**
* 保存更新语句
*/
private String saveUpdateSql(CanalEntry.Entry entry)
try
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDataList)
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++)
if (!newColumnList.get(i).getIsKey())
sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1)
sql.append(",");
sql.append(" where ");
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : oldColumnList)
if (column.getIsKey())
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
return sql.toString();
catch (InvalidProtocolBufferException e)
return null;
return null;
/**
* 保存删除语句
*/
private String saveDeleteSql(CanalEntry.Entry entry)
try
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDataList)
List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (CanalEntry.Column column : columnList)
if (column.getIsKey())
sql.append(column.getName() + "=" + column.getValue());
break;
return sql.toString();
catch (InvalidProtocolBufferException e)
return null;
return null;
/**
* 保存插入语句
*/
private String saveInsertSql(CanalEntry.Entry entry)
try
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDataList)
List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into "+entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++)
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1)
sql.append(",");
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++)
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1)
sql.append(",");
sql.append(")");
return sql.toString();
以上是关于springboot整合canal,监听MySQL binlog日志,实现增量同步的主要内容,如果未能解决你的问题,请参考以下文章