Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter

Posted caoweixiong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter相关的知识,希望对你有一定的参考价值。

 

前言

本篇只介绍跟 Kafka模式 相关的配置。

 

一、架构

技术图片

 

二、canal-server 配置

  • 修改canal 配置文件: vi /usr/local/canal/conf/canal.properties

...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
...
# kafka
/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false
  • 修改instance 配置文件: vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=rds_test
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

 

三、canal-adapter 配置

  • 修改adapter 配置文件:vi conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka # tcp kafka rocketMQ
#  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181
  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:

  canalAdapters:
  - instance: rds_test # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
       - name: rdb
         key: oracle1
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager
    - groupId: g2
      outerAdapters:
       - name: rdb
         key: oracle2
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager

说明: 一份数据可以被多个 group 同时消费, 多个 group 之间会是一个并行执行, 一个 group 内部是一个串行执行多个 outerAdapters。

 

 

参考:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

以上是关于Canal——canal server 读取 binlog 到 kafka 然后在使用 canal-adapter的主要内容,如果未能解决你的问题,请参考以下文章

Canal简介

canal-client和canal-server

Canal源码分析Canal Server的启动和停止过程

Canal利用canal实现mysql实时增量备份并对接kafka

「从零单排canal 05」 server模块源码解析

docker部署canal 1.1.6 rocketmq 分区顺序性