SpringBoot整合Debezium CDC同步数据至目标数据库

Posted 地表最强菜鸡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Debezium CDC同步数据至目标数据库相关的知识,希望对你有一定的参考价值。

一、Debezium Spring Boot应用程序架构

        上图为使用嵌入式Debezium Spring Boot数据库连接器执行CDC的基本工作流架构。

二、SpringBoot通过pom文件引入jar 

 <properties>
        <debezium.version>1.5.2.Final</debezium.version>
 </properties>
 
 
      ......  
 
 
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>$debezium.version</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>$debezium.version</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>$debezium.version</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

三、修改配置文件

#Debezium数据同步配置
timely:
  # 是否开启
  switch: true
  # 偏移量文件
  offset-file-name: /data/mysql/sync/offsets.dat
#  offset-file-name: C:\\Users\\cmg\\Desktop\\offsets.dat
  # 是否启东时清除偏移量文件
  offset-file-clean: true
  # 偏移量提交时间 单位ms
  offset-time: 1
  # 读取历史记录文件
  history-file-name: /data/mysql/own/datadir/mysql-bin.index
#  history-file-name: C:\\Users\\cmg\\Desktop\\mysql-bin.index
  # 读取的数据库信息
  offline:
    ip: 39.107.82.161
    port: 3306
    username: root
    password: 88888888Stg
    # 保证每个数据库读取的 instance-name  logic-name 不能相同
    # 实例名
    instance-name: instance-name
    # 逻辑名
    logic-name: instance-name
    # 读取的表
    include-table: vehicle
    # 读取的库
    include-db: wk_crm
    server-id: 1

四、代码案例

@Configuration
@Log4j2
public class ChangeEventConfig 
    private final ChangeEventHandler changeEventHandler;
 
    @Value("$timely.offset-file-name")
    private String offsetFileName;
    @Value("$timely.offset-file-clean:true")
    private Boolean offsetFileDelete;
    @Value("$timely.offset-time")
    private String offsetTime;
    @Value("$timely.history-file-name")
    private String historyFileName;
    @Value("$timely.offline.instance-name")
    private String instanceName;
    @Value("$timely.offline.logic-name")
    private String logicName;
    @Value("$timely.offline.ip")
    private String ip;
    @Value("$timely.offline.port")
    private String port;
    @Value("$timely.offline.username")
    private String username;
    @Value("$timely.offline.password")
    private String password;
    @Value("$timely.offline.include-table")
    private String includeTable;
    @Value("$timely.offline.include-db")
    private String includeDb;
    @Value("$timely.offline.server-id")
    private String serverId;
 
    @Autowired
    public ChangeEventConfig(ChangeEventHandler changeEventHandler) 
        this.changeEventHandler = changeEventHandler;
    
 
    @Bean
    public void cleanFile() 
        if (offsetFileDelete && FileUtil.exist(offsetFileName)) 
            FileUtil.del(offsetFileName);
        
    
 
 
    /**
     * 实例化sql server 实时同步服务类,执行任务
     *
     * @param configuration
     * @return
     */
    @Bean
    SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) 
        SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
        DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
                .create(ChangeEventFormat.of(Connect.class))
                .using(configuration.asProperties())
                .notifying(changeEventHandler::handlePayload)
                .build();
        sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
        return sqlServerTimelyExecutor;
    
 
    /**
     * @desc 同步执行服务类
     */
    @Data
    @Log4j2
    public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle 
        private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
        private DebeziumEngine<?> debeziumEngine;
 
        @Override
        public void start() 
            log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
            executor.execute(debeziumEngine);
        
 
        @SneakyThrows
        @Override
        public void stop() 
            log.warn("debeziumEngine 监听实例关闭!");
            debeziumEngine.close();
            Thread.sleep(2000);
            log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池关闭!");
            executor.shutdown();
        
 
        @Override
        public boolean isRunning() 
            return false;
        
 
        @Override
        public void afterPropertiesSet() 
            Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
        
 
        public enum ThreadPoolEnum 
            /**
             * 实例
             */
            INSTANCE;
 
            public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";
 
            public static final String MySQL_LISTENER_POOL = "mysql-listener-pool";
            /**
             * 线程池单例
             */
            private final ExecutorService es;
 
            /**
             * 枚举 (构造器默认为私有)
             */
            ThreadPoolEnum() 
                final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(MySQL_LISTENER_POOL + "-%d").build();
                es = new ThreadPoolExecutor(8, 16, 60,
                        TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
                        threadFactory, new ThreadPoolExecutor.DiscardPolicy());
            
 
            /**
             * 公有方法
             *
             * @return ExecutorService
             */
            public ExecutorService getInstance() 
                return es;
            
        
    
 

五、变更数据处理实现类

@Service
@Log4j2
public class ChangeEventHandler 
 
    public static final String DATA = "data";
    public static final String BEFORE_DATA = "beforeData";
    public static final String EVENT_TYPE = "eventType";
    public static final String SOURCE = "source";
    public static final String TABLE = "table";
 
    private enum FilterJsonFieldEnum 
        /**
         * 表
         */
        table,
        /**
         * 库
         */
        db,
        /**
         * 操作时间
         */
        ts_ms,
        ;
 
        public static Boolean filterJsonField(String fieldName) 
            return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
        
    
 
    /**
     * @desc 变更类型枚举
     **/
    public enum EventTypeEnum 
        /**
         * 增
         */
        CREATE(1),
        /**
         * 改
         */
        UPDATE(2),
        /**
         * 删
         */
        DELETE(3),
        ;
        @Getter
        private final int type;
 
        EventTypeEnum(int type) 
            this.type = type;
        
    
 
    public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                              DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) 
        for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) 
            SourceRecord sourceRecord = r.record();
            Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
            if (sourceRecordChangeValue == null) 
                continue;
            
            // 获取变更表数据
            Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
            if (CollectionUtils.isEmpty(changeMap)) 
                continue;
            
            ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
            if (changeListenerModel == null) 
                continue;
            
            String db = changeListenerModel.getDb();
            String table = changeListenerModel.getTable();
            Integer eventType = changeListenerModel.getEventType();
            //wk_crm.wk_crm_customer 同步es
            if ("wk_crm".equalsIgnoreCase(db) && "wk_crm_customer".equalsIgnoreCase(table))
                JSONObject jsonObject = JSONObject.parseObject(changeListenerModel.getData());
                String customerId = jsonObject.getString("customer_id");
 
                switch (eventType)
                    //增改
                    case 1:
                    case 2:
                        log.info("变更ES customerId:", customerId);
                        changeCustomerES(customerId);
                        break;
                    //删除
                    case 3:
                        log.info("删除ES customerId:", customerId);
                        deleteCustomerES(customerId);
                        break;
                    default:
                        continue;
                
 
            else 
                continue;
            
 
            String jsonString = JSON.toJSONString(changeListenerModel);
            log.info("发送变更数据:", jsonString);
        
        try 
            recordCommitter.markBatchFinished();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    
 
    private void changeCustomerES(String customerId) 
        HttpRequest post = HttpUtil.createPost("https://xxxx/crmCustomer/addalles?customerIds=" + customerId);
        HttpResponse execute = post.execute();
        log.info("同步ES返回结果:", execute.body());
    
    private void deleteCustomerES(String customerId) 
 
    
 
    private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) 
        // 操作类型过滤,只处理增删改
        Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
        if (operation != Envelope.Operation.READ) 
            Integer eventType = null;
            Map<String, Object> result = new HashMap<>(4);
            if (operation == Envelope.Operation.CREATE) 
                eventType = EventTypeEnum.CREATE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
                result.put(BEFORE_DATA, null);
            
            // 修改需要特殊处理,拿到前后的数据
            if (operation == Envelope.Operation.UPDATE) 
                if (!changeMap.containsKey(TABLE)) 
                    return null;
                
                eventType = EventTypeEnum.UPDATE.getType();
                String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
                // 忽略非重要属性变更
                Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
                if (CollectionUtils.isEmpty(resultMap)) 
                    return null;
                
                result.put(DATA, resultMap.get(AFTER));
                result.put(BEFORE_DATA, resultMap.get(BEFORE));
            
            if (operation == Envelope.Operation.DELETE) 
                eventType = EventTypeEnum.DELETE.getType();
                result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
                result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
            
            result.put(EVENT_TYPE, eventType);
            result.putAll(changeMap);
            return BeanUtil.copyProperties(result, ChangeListenerModel.class);
        
        return null;
    
 
    /**
     * 过滤非重要变更数据
     *
     * @param sourceRecordChangeValue
     * @param currentTableName
     * @return
     */
    private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) 
        Map<String, String> resultMap = new HashMap<>(4);
        Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
        Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
        //todo 根据表过滤字段
        resultMap.put(AFTER, JSON.toJSONString(afterMap));
        resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
        return resultMap;
    
 
    /**
     * 校验是否仅仅是非重要字段属性变更
     * @param currentTableName
     * @param afterMap
     * @param beforeMap
     * @param filterColumnList
     * @return
     */
    private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
                                          Map<String, Object> beforeMap, List<String> filterColumnList) 
        Map<String, Boolean> filterMap = new HashMap<>(16);
        for (String key : afterMap.keySet()) 
            Object afterValue = afterMap.get(key);
            Object beforeValue = beforeMap.get(key);
            filterMap.put(key, !Objects.equals(beforeValue, afterValue));
        
        filterColumnList.parallelStream().forEach(filterMap::remove);
        if (filterMap.values().stream().noneMatch(x -> x)) 
            log.info("表:无核心资料变更,忽略此次操作!", currentTableName);
            return true;
        
        return false;
    
 
 
    public String getChangeData(Struct sourceRecordChangeValue, String record) 
        Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
        if (CollectionUtils.isEmpty(changeDataMap)) 
            return null;
        
        return JSON.toJSONString(changeDataMap);
    
 
    public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) 
        Struct struct = (Struct) sourceRecordChangeValue.get(record);
        // 将变更的行封装为Map
        Map<String, Object> changeData = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null)
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (CollectionUtils.isEmpty(changeData)) 
            return null;
        
        return changeData;
    
 
    private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) 
        Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
        Map<String, Object> map = struct.schema().fields().stream()
                .map(Field::name)
                .filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
                .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                .collect(toMap(Pair::getKey, Pair::getValue));
        if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) 
            map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
            map.remove(FilterJsonFieldEnum.ts_ms.name());
        
        return map;
    
 

六、部署方式

        将构建好的jar包或容器,上传至 sourceTarget 目标服务器,并启动。

以上是关于SpringBoot整合Debezium CDC同步数据至目标数据库的主要内容,如果未能解决你的问题,请参考以下文章

CDC 与 docker 中的 debezium

多个表之间 CDC 事件的 Debezium 排序

Debezium 没有为 mysql 提供嵌入式版本的 CDC

成功创建 Always On SQL Server 快照后,Debezium 未跟踪 CDC

PostgreSQL 和事务上的 Debezium CDC

Debezium-Flink-Hudi:实时流式CDC