使用canal将mysql同步到es中

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用canal将mysql同步到es中相关的知识,希望对你有一定的参考价值。

参考技术A

因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧

根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。

mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。

经 canal 解析过的对象,我们使用起来就非常的方便了。

再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。

根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。

下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:

然后修改 canal 的配置文件 vim conf/example/instance.properties

这三项改成你自己的,比如我的配置如下:

然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)

接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。

canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。

执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。

执行 vim /etc/my.cnf 命令。添加下面 3 个配置。

然后保存并退出。

接着执行 sudo service mysqld restart 重启 MySQL。

需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。

如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。

MySQL 启动后,就可以开启 canal 服务了。

开启后,观察 canal 服务的日志,确保服务正常。

查看 canal 的日志

确定没有问题后,开始编写我们的测试程序。

pom.xml 中导入下面的依赖。

使用JAVA进行测试

然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载

放入服务器中,依次执行下面命令

然后修改配置文件 :

然后将需要运行存储到es的的yml文件放入到

目录下。例如:

然后开启canal-adapter服务

/usr/local/soft/canal-adapter/bin/startup.sh

查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了

注意:

1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包

2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中

加了一个判断后才可以

3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型

可以根据各自情况修改。

canal实时同步mysql表到es

方案说明

canal是阿里云开源的解析binlog组件,同步到ES需要deployer和adapter两部分:
deployer解析MySQL binlog,adapter将数据写入到ES

优点

开源成熟稳定
简单易用

缺点

不支持全量数据初始化

部署

logstash进行全量
/etc/hosts配置es7.test.com域名解析

deployer

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar zxf canal.deployer-1.1.5.tar.gz -C deployer
cd deployer
vim conf/shop/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# table meta tsdb info
canal.instance.tsdb.enable=true

# username/password
canal.instance.dbUsername=root 
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false

# table regex
canal.instance.filter.regex=test.shop
#################################################

# 启动
sh bin/startup.sh
# 关闭
sh bin/stop.sh

adapter

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
tar zxf canal.adapter-1.1.5.tar.gz -C adapter
cd adapter
# 配置消费
vim conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp
  syncBatchSize: 2
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: shop
    groups:
    - groupId: g-shop
      outerAdapters:
      - name: logger
      - name: es7
        hosts: es7.test.com:9200
        properties:
          mode: rest
          security.auth: elastic:123456
          cluster.name: testes

### 表映射
vim conf/es7/shop.yml
dataSourceKey: defaultDS
destination: shop
groupId: g-shop
esMapping:
  _index: canal_shop
  _type: _doc
  _id: _id
  upsert: true
  sql: "select id _id,id,cid,name,remartk from shop"
  commitBatch: 1000

# 启动
sh bin/startup.sh
# 关闭
sh bin/stop.sh

 ES创建索引

PUT /canal_shop
{
    "settings": {
        "number_of_replicas": 0,
        "number_of_shards": 1,
        "index.codec": "best_compression"
    },
    "mappings": {
        "properties" : {
            "id":{
                "type":"integer"
            },
            "cid":{
                "type":"integer"
            },
            "name":{
                "type":"text"
            },
            "remark":{
                "type":"text"
            }
            }
    }
}

以上是关于使用canal将mysql同步到es中的主要内容,如果未能解决你的问题,请参考以下文章

Canal——增量同步MySQL数据到ES

使用Canal中间件同步MySql数据到ElasticSearch

实现mysql与ES的增量数据同步

canal同步mysql数据至es5.5.0

canal实现同步mysql至es

springboot 整合 canal