SpringBoot整合Debezium CDC同步数据至目标数据库
Posted 地表最强菜鸡
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Debezium CDC同步数据至目标数据库相关的知识,希望对你有一定的参考价值。
一、Debezium Spring Boot应用程序架构
上图为使用嵌入式Debezium Spring Boot数据库连接器执行CDC的基本工作流架构。
二、SpringBoot通过pom文件引入jar
<properties>
<debezium.version>1.5.2.Final</debezium.version>
</properties>
......
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>$debezium.version</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>$debezium.version</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>$debezium.version</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
三、修改配置文件
#Debezium数据同步配置
timely:
# 是否开启
switch: true
# 偏移量文件
offset-file-name: /data/mysql/sync/offsets.dat
# offset-file-name: C:\\Users\\cmg\\Desktop\\offsets.dat
# 是否启东时清除偏移量文件
offset-file-clean: true
# 偏移量提交时间 单位ms
offset-time: 1
# 读取历史记录文件
history-file-name: /data/mysql/own/datadir/mysql-bin.index
# history-file-name: C:\\Users\\cmg\\Desktop\\mysql-bin.index
# 读取的数据库信息
offline:
ip: 39.107.82.161
port: 3306
username: root
password: 88888888Stg
# 保证每个数据库读取的 instance-name logic-name 不能相同
# 实例名
instance-name: instance-name
# 逻辑名
logic-name: instance-name
# 读取的表
include-table: vehicle
# 读取的库
include-db: wk_crm
server-id: 1
四、代码案例
@Configuration
@Log4j2
public class ChangeEventConfig
private final ChangeEventHandler changeEventHandler;
@Value("$timely.offset-file-name")
private String offsetFileName;
@Value("$timely.offset-file-clean:true")
private Boolean offsetFileDelete;
@Value("$timely.offset-time")
private String offsetTime;
@Value("$timely.history-file-name")
private String historyFileName;
@Value("$timely.offline.instance-name")
private String instanceName;
@Value("$timely.offline.logic-name")
private String logicName;
@Value("$timely.offline.ip")
private String ip;
@Value("$timely.offline.port")
private String port;
@Value("$timely.offline.username")
private String username;
@Value("$timely.offline.password")
private String password;
@Value("$timely.offline.include-table")
private String includeTable;
@Value("$timely.offline.include-db")
private String includeDb;
@Value("$timely.offline.server-id")
private String serverId;
@Autowired
public ChangeEventConfig(ChangeEventHandler changeEventHandler)
this.changeEventHandler = changeEventHandler;
@Bean
public void cleanFile()
if (offsetFileDelete && FileUtil.exist(offsetFileName))
FileUtil.del(offsetFileName);
/**
* 实例化sql server 实时同步服务类,执行任务
*
* @param configuration
* @return
*/
@Bean
SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration)
SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(changeEventHandler::handlePayload)
.build();
sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
return sqlServerTimelyExecutor;
/**
* @desc 同步执行服务类
*/
@Data
@Log4j2
public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle
private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start()
log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
executor.execute(debeziumEngine);
@SneakyThrows
@Override
public void stop()
log.warn("debeziumEngine 监听实例关闭!");
debeziumEngine.close();
Thread.sleep(2000);
log.warn(ThreadPoolEnum.MySQL_LISTENER_POOL + "线程池关闭!");
executor.shutdown();
@Override
public boolean isRunning()
return false;
@Override
public void afterPropertiesSet()
Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
public enum ThreadPoolEnum
/**
* 实例
*/
INSTANCE;
public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";
public static final String MySQL_LISTENER_POOL = "mysql-listener-pool";
/**
* 线程池单例
*/
private final ExecutorService es;
/**
* 枚举 (构造器默认为私有)
*/
ThreadPoolEnum()
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(MySQL_LISTENER_POOL + "-%d").build();
es = new ThreadPoolExecutor(8, 16, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
threadFactory, new ThreadPoolExecutor.DiscardPolicy());
/**
* 公有方法
*
* @return ExecutorService
*/
public ExecutorService getInstance()
return es;
五、变更数据处理实现类
@Service
@Log4j2
public class ChangeEventHandler
public static final String DATA = "data";
public static final String BEFORE_DATA = "beforeData";
public static final String EVENT_TYPE = "eventType";
public static final String SOURCE = "source";
public static final String TABLE = "table";
private enum FilterJsonFieldEnum
/**
* 表
*/
table,
/**
* 库
*/
db,
/**
* 操作时间
*/
ts_ms,
;
public static Boolean filterJsonField(String fieldName)
return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
/**
* @desc 变更类型枚举
**/
public enum EventTypeEnum
/**
* 增
*/
CREATE(1),
/**
* 改
*/
UPDATE(2),
/**
* 删
*/
DELETE(3),
;
@Getter
private final int type;
EventTypeEnum(int type)
this.type = type;
public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter)
for (RecordChangeEvent<SourceRecord> r : recordChangeEvents)
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue == null)
continue;
// 获取变更表数据
Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
if (CollectionUtils.isEmpty(changeMap))
continue;
ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
if (changeListenerModel == null)
continue;
String db = changeListenerModel.getDb();
String table = changeListenerModel.getTable();
Integer eventType = changeListenerModel.getEventType();
//wk_crm.wk_crm_customer 同步es
if ("wk_crm".equalsIgnoreCase(db) && "wk_crm_customer".equalsIgnoreCase(table))
JSONObject jsonObject = JSONObject.parseObject(changeListenerModel.getData());
String customerId = jsonObject.getString("customer_id");
switch (eventType)
//增改
case 1:
case 2:
log.info("变更ES customerId:", customerId);
changeCustomerES(customerId);
break;
//删除
case 3:
log.info("删除ES customerId:", customerId);
deleteCustomerES(customerId);
break;
default:
continue;
else
continue;
String jsonString = JSON.toJSONString(changeListenerModel);
log.info("发送变更数据:", jsonString);
try
recordCommitter.markBatchFinished();
catch (InterruptedException e)
e.printStackTrace();
private void changeCustomerES(String customerId)
HttpRequest post = HttpUtil.createPost("https://xxxx/crmCustomer/addalles?customerIds=" + customerId);
HttpResponse execute = post.execute();
log.info("同步ES返回结果:", execute.body());
private void deleteCustomerES(String customerId)
private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap)
// 操作类型过滤,只处理增删改
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ)
Integer eventType = null;
Map<String, Object> result = new HashMap<>(4);
if (operation == Envelope.Operation.CREATE)
eventType = EventTypeEnum.CREATE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
result.put(BEFORE_DATA, null);
// 修改需要特殊处理,拿到前后的数据
if (operation == Envelope.Operation.UPDATE)
if (!changeMap.containsKey(TABLE))
return null;
eventType = EventTypeEnum.UPDATE.getType();
String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
// 忽略非重要属性变更
Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
if (CollectionUtils.isEmpty(resultMap))
return null;
result.put(DATA, resultMap.get(AFTER));
result.put(BEFORE_DATA, resultMap.get(BEFORE));
if (operation == Envelope.Operation.DELETE)
eventType = EventTypeEnum.DELETE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
result.put(EVENT_TYPE, eventType);
result.putAll(changeMap);
return BeanUtil.copyProperties(result, ChangeListenerModel.class);
return null;
/**
* 过滤非重要变更数据
*
* @param sourceRecordChangeValue
* @param currentTableName
* @return
*/
private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName)
Map<String, String> resultMap = new HashMap<>(4);
Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
//todo 根据表过滤字段
resultMap.put(AFTER, JSON.toJSONString(afterMap));
resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
return resultMap;
/**
* 校验是否仅仅是非重要字段属性变更
* @param currentTableName
* @param afterMap
* @param beforeMap
* @param filterColumnList
* @return
*/
private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
Map<String, Object> beforeMap, List<String> filterColumnList)
Map<String, Boolean> filterMap = new HashMap<>(16);
for (String key : afterMap.keySet())
Object afterValue = afterMap.get(key);
Object beforeValue = beforeMap.get(key);
filterMap.put(key, !Objects.equals(beforeValue, afterValue));
filterColumnList.parallelStream().forEach(filterMap::remove);
if (filterMap.values().stream().noneMatch(x -> x))
log.info("表:无核心资料变更,忽略此次操作!", currentTableName);
return true;
return false;
public String getChangeData(Struct sourceRecordChangeValue, String record)
Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
if (CollectionUtils.isEmpty(changeDataMap))
return null;
return JSON.toJSONString(changeDataMap);
public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record)
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> changeData = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (CollectionUtils.isEmpty(changeData))
return null;
return changeData;
private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue)
Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
Map<String, Object> map = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (map.containsKey(FilterJsonFieldEnum.ts_ms.name()))
map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
map.remove(FilterJsonFieldEnum.ts_ms.name());
return map;
六、部署方式
将构建好的jar包或容器,上传至 sourceTarget 目标服务器,并启动。
以上是关于SpringBoot整合Debezium CDC同步数据至目标数据库的主要内容,如果未能解决你的问题,请参考以下文章
Debezium 没有为 mysql 提供嵌入式版本的 CDC