ES增量同步方案
Posted 涂有
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES增量同步方案相关的知识,希望对你有一定的参考价值。
1 基于业务代码嵌入式的增量同步方式
在Java业务代码要修改业务数据的地方,增加调用写入ES数据的方法
优点:1、实现方式简单,可控粒度高;2、不依赖第三方数据同步框架;3、数据库不用做特殊配置和部署;
缺点:1:所有修改数据的地方都要添加同步ES逻辑,新增接口或者修改代码要同时关注ES的同步逻辑,否则可能导致数据库和ES的数据不一致;
2:如果直接提sql修数据,会造成数据库和ES数据不一致;
3:要自己处理同步失败的重试问题
2 基于数据库离线日志的增量同步方式
通过监控oracle的redo log日志和在线日志的方式,把日志文件还原成sql,把sql发送到MQ中间件,Java消费MQ数据把数据的变更同步到ES,同步流程参考下图。
优化:1、对数据库性能损耗最小;2、对业务代码侵入性最低;3、不容易造成数据库和ES数据不一致情况
缺点:1、解析日志方式较为复杂,只能采用第三方开源框架,引入第三方开源框架会增加技术学习成本和运维成本,并且其稳定性不敢保证;
2、需要公司DBA团队配合,需要Oracle开启日志记录等配置,需要开DBA系统账号,另外可能还需要开启Oracle的ASM实例
基于数据日志的增量同步开源框架O2K:https://hub.docker.com/r/woqutech/o2k
3 基于数据库触发器的增量同步方式
通过数据库的触发器监控数据的增加、修改、删除,然后把新增、修改、删除的数据备份一条数据到变更记录表里,通过Java定时器定时同步方式的把变更的数据同步到ES中,具体同步流程参数下图。
优点:1、不会造成数据库和ES数据不一致情况;2、业务代码侵入性低;3、技术实现简单、便捷,就算不依赖第三方框架也能轻松实现(当然也有基于触发器实现的数据同步开源框架,比如:SymmetricDS)
缺点:触发器对insert、update、delete有一定性能损耗
基于触发器实现的开源框架,SymmetricDS文档参考:https://www.symmetricds.org/doc/3.14/html/user-guide.html#_kafka
4 基于数据库通知的增量同步方式
启动应用程序,通过OracleConnection.registerDatabaseChangeNotification(Properties prop).addListener(DatabaseChangeListener listener)的方式接收oracle数据变更通知
优点:1、同步延时低;2、代码侵入性低;
缺点:1、通知特性是Oracle的实验特性,并不稳定,有些版本并不支持;
2、要自己处理数据同步失败的缓冲问题,前面的方案中的MQ和变更记录表就是中间缓冲
基于Oracle通知实现的增量同步开源框架DBSyncer,参考文档:https://gitee.com/ghi/dbsyncer?_from=gitee_search#%E4%BB%8B%E7%BB%8D
import oracle.jdbc.OracleConnection; import oracle.jdbc.OracleStatement; import oracle.jdbc.dcn.*; import oracle.jdbc.pool.OracleDataSource;
import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Properties;
/** * @author tuyou * @date 2023/1/11 17:08 */ public class MyTest
public static void main(String[] args) throws SQLException String tableName = "EXPENSE_GENERAL_ORDER";
OracleDataSource dataSource = new OracleDataSource(); dataSource.setUser("xxx"); dataSource.setPassword("xxx"); dataSource.setURL("jdbc:oracle:thin:@ip:1521/xxx"); final OracleConnection conn = (OracleConnection) dataSource.getConnection();
Properties prop = new Properties(); // prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true"); // prop.setProperty(OracleConnection.DCN_NOTIFY_CHANGELAG, "1"); prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true"); prop.setProperty(OracleConnection.NTF_QOS_PURGE_ON_NTFN, "true"); prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");
final DatabaseChangeRegistration databaseChangeRegistration = conn.registerDatabaseChangeNotification(prop); databaseChangeRegistration.addListener(new DatabaseChangeListener() @Override public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) long regId = databaseChangeEvent.getRegId(); System.out.println("change notify: " + Arrays.toString(databaseChangeEvent.getTableChangeDescription()));
if (regId == databaseChangeRegistration.getRegId()) TableChangeDescription[] tds = databaseChangeEvent.getTableChangeDescription(); System.out.println("'TableChangeDescription change count:" + tds.length); for (TableChangeDescription td : tds) System.out.println("table id: " + td.getObjectNumber()); System.out.println("table name: " + td.getTableName()); RowChangeDescription[] rds = td.getRowChangeDescription(); for (RowChangeDescription rd : rds) System.out.println("row id: " + rd.getRowid().stringValue()); System.out.println("row change count: " + rd.getRowOperation().toString());
); OracleStatement statement = (OracleStatement) conn.createStatement(); statement.setDatabaseChangeRegistration(databaseChangeRegistration);
ResultSet resultSet = statement.executeQuery("select * from " + tableName + " where 1=2"); statement.close(); System.err.println("started");
|
5 基于扫描表更新时间的增量同步方式
在数据库表更新时自动更新表的更新时间,然后通过定时任务扫描更新时间大于最后同步时间的数据,拿到更新的数据
优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;
缺点:1、oracle还是要建立触发器才能自动更新时间,也会存在触发器的缺点;2、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据
6 基于Oracle的闪回查询方式的增量同步方式
oracle的闪回查询可以查询最近一段时间的数据的增删改记录,利用这种特性也可以做增量同步
优点:1、实现原理相对简单;2、不用采用触发器
缺点:1、闪回查询有时间保留限制,如果应用由于停机原因可能造成部分数据变动没有同步;2、闪回查询数据量和实际表数据量相关,如果表数据量较大,查询会非常慢
闪回查询sql参考:
(SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss') minus SELECT F_DJBH FROM BF_BIZ_INFO) union (SELECT F_DJBH FROM BF_BIZ_INFO minus SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss')) |
7 基于Oracle的SCN机制方式的增量同步方式
同步思想5类似,但是更新时间是基于oracle的SCN机制,利用oracle的隐藏的ora_rowscn列来发现更新的数据,参考sql:select ora_rowscn, F_DJBH from BF_BIZ_INFO where ora_rowscn >= '169639743457'
优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;
缺点:1、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据或者建立一个delete触发器
8 其他ETL开源平台
DataX:采用写sql的扫描数据,不适合做线上数据同步,只适合离线数据同步。官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md
同步案例: https://blog.csdn.net/weixin_42418589/article/details/126019261
Tapdata:开源版本并不支持Oracle,可以支持mysql
最终选择
基于对我们系统现状的分析,最终确定的方案选择是基于Oracle的SCN机制 + 触发器来实现增量同步数据到ES,不依赖开源框架,基于Oracle的SCN机制实现insert/update数据的增量同步,基于delete触发器实现delete数据的增量同步。具体同步逻辑如下图。
以上是关于ES增量同步方案的主要内容,如果未能解决你的问题,请参考以下文章
干货 | Debezium实现Mysql到Elasticsearch高效实时同步