spring cloud集成canal

Posted xiaostudy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud集成canal相关的知识,希望对你有一定的参考价值。

前提

win运行canal

 

加入canal依赖

1 <dependency>
2     <groupId>com.alibaba.otter</groupId>
3     <artifactId>canal.client</artifactId>
4     <version>1.1.3</version>
5 </dependency>

 

把ip、端口、监听表名做成配置文件

技术图片

 

 

 

代码实现

  1 package com.frame.modules.dabis.archives.thread;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import com.alibaba.otter.canal.client.CanalConnector;
  5 import com.alibaba.otter.canal.client.CanalConnectors;
  6 import com.alibaba.otter.canal.protocol.CanalEntry;
  7 import com.alibaba.otter.canal.protocol.Message;
  8 import com.frame.solr.em.SolrCode;
  9 import com.frame.utils.PropertiesLoader;
 10 import org.apache.commons.logging.Log;
 11 import org.apache.commons.logging.LogFactory;
 12 
 13 import java.net.InetSocketAddress;
 14 import java.util.HashMap;
 15 import java.util.List;
 16 import java.util.Map;
 17 
 18 /**
 19  * @author liwei
 20  * @date 2019/8/2 14:39
 21  * @desc Created with IntelliJ IDEA.
 22  */
 23 public class CanalThread implements Runnable 
 24 
 25     Log log = LogFactory.getLog(CanalThread.class);
 26 
 27     private String solrName = SolrCode.ARCHIVES.getValue();
 28 
 29 
 30     @Override
 31     public void run() 
 32         PropertiesLoader loader = new PropertiesLoader("solrConfig.properties");
 33         listener(loader.getProperty("canalHost"), loader.getProperty("canalPort"), loader.getProperty("canalTable"));
 34     
 35 
 36 
 37     public void listener(String canalHost, String canalPort, String table) 
 38         // 创建链接
 39         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, Integer.valueOf(canalPort)), "example", "", "");
 40         int batchSize = 1000;
 41         try 
 42             // 连接
 43             connector.connect();
 44             // 监听表
 45             connector.subscribe(table);
 46             connector.rollback();
 47             // 一直循环监听
 48             while (true) 
 49                 // 获取指定数量的数据
 50                 Message message = connector.getWithoutAck(batchSize);
 51                 long batchId = message.getId();
 52                 if(-1 != batchId && 0 != message.getEntries().size()) 
 53                     printEntry(message.getEntries());
 54                 
 55                 // 提交确认
 56                 connector.ack(batchId);
 57             
 58          finally 
 59             connector.disconnect();
 60         
 61     
 62 
 63     /**
 64      * 打印具体变化
 65      * @param entrys
 66      */
 67     private void printEntry(List<CanalEntry.Entry> entrys) 
 68         for (CanalEntry.Entry entry : entrys) 
 69             if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) 
 70                 continue;
 71             
 72 
 73             CanalEntry.RowChange rowChage = null;
 74             try 
 75                 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
 76              catch (Exception e) 
 77                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 78                         e);
 79             
 80 
 81             CanalEntry.EventType eventType = rowChage.getEventType();
 82             System.out.println(String.format("================> binlog[%s:%s] , 数据库:%s,表名%s , 类型: %s",
 83                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 84                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 85                     eventType));
 86 
 87             for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) 
 88                 if (eventType == CanalEntry.EventType.DELETE) 
 89                     printColumn(rowData.getBeforeColumnsList());
 90                  else if (eventType == CanalEntry.EventType.INSERT) 
 91                     printColumn(rowData.getAfterColumnsList());
 92                  else 
 93                     System.out.println("-------修改之前");
 94                     printColumn(rowData.getBeforeColumnsList());
 95                     System.out.println("-------修改之后");
 96                     printColumn(rowData.getAfterColumnsList());
 97                 
 98             
 99         
100     
101 
102     private void printColumn(List<CanalEntry.Column> columns) 
103         Map<String,Object> aaMap = new HashMap<>();
104         for (CanalEntry.Column column : columns) 
105             aaMap.put(column.getName(), column.getValue());
106         
107         System.out.println( new JSONObject(aaMap).toJSONString());
108     
109 

 

新增

技术图片

 

 技术图片

 

 

 

修改

技术图片

 

 技术图片

 

 技术图片

 

 

删除

技术图片

 

 技术图片

 

 技术图片

 

 

注意:拿到的值都是字符串,建议拿到id反查数据库,拿到对象再同步到自己的缓存。

 

以上是关于spring cloud集成canal的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Gateway集成

Spring Cloud Hystrix集成

Spring cloud 集成Swagger

Spring Cloud微服务分布式云架构 - spring cloud集成项目

Spring Cloud Ribbon集成

Spring Cloud微服务分布式云架构 - spring cloud集成项目