ES增量同步方案

Posted 涂有

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES增量同步方案相关的知识,希望对你有一定的参考价值。

1 基于业务代码嵌入式的增量同步方式

在Java业务代码要修改业务数据的地方,增加调用写入ES数据的方法

优点:1、实现方式简单,可控粒度高;2、不依赖第三方数据同步框架;3、数据库不用做特殊配置和部署;

缺点:1:所有修改数据的地方都要添加同步ES逻辑,新增接口或者修改代码要同时关注ES的同步逻辑,否则可能导致数据库和ES的数据不一致;

2:如果直接提sql修数据,会造成数据库和ES数据不一致;

3:要自己处理同步失败的重试问题

2 基于数据库离线日志的增量同步方式

通过监控oracle的redo log日志和在线日志的方式,把日志文件还原成sql,把sql发送到MQ中间件,Java消费MQ数据把数据的变更同步到ES,同步流程参考下图。

优化:1、对数据库性能损耗最小;2、对业务代码侵入性最低;3、不容易造成数据库和ES数据不一致情况

缺点:1、解析日志方式较为复杂,只能采用第三方开源框架,引入第三方开源框架会增加技术学习成本和运维成本,并且其稳定性不敢保证;

2、需要公司DBA团队配合,需要Oracle开启日志记录等配置,需要开DBA系统账号,另外可能还需要开启Oracle的ASM实例

基于数据日志的增量同步开源框架O2K:https://hub.docker.com/r/woqutech/o2k

3 基于数据库触发器的增量同步方式

通过数据库的触发器监控数据的增加、修改、删除,然后把新增、修改、删除的数据备份一条数据到变更记录表里,通过Java定时器定时同步方式的把变更的数据同步到ES中,具体同步流程参数下图。

优点:1、不会造成数据库和ES数据不一致情况;2、业务代码侵入性低;3、技术实现简单、便捷,就算不依赖第三方框架也能轻松实现(当然也有基于触发器实现的数据同步开源框架,比如:SymmetricDS)

缺点:触发器对insert、update、delete有一定性能损耗

基于触发器实现的开源框架,SymmetricDS文档参考:https://www.symmetricds.org/doc/3.14/html/user-guide.html#_kafka

4 基于数据库通知的增量同步方式

启动应用程序,通过OracleConnection.registerDatabaseChangeNotification(Properties prop).addListener(DatabaseChangeListener listener)的方式接收oracle数据变更通知

优点:1、同步延时低;2、代码侵入性低;

缺点:1、通知特性是Oracle的实验特性,并不稳定,有些版本并不支持;

2、要自己处理数据同步失败的缓冲问题,前面的方案中的MQ和变更记录表就是中间缓冲

基于Oracle通知实现的增量同步开源框架DBSyncer,参考文档:https://gitee.com/ghi/dbsyncer?_from=gitee_search#%E4%BB%8B%E7%BB%8D

import oracle.jdbc.OracleConnection;

import oracle.jdbc.OracleStatement;

import oracle.jdbc.dcn.*;

import oracle.jdbc.pool.OracleDataSource;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.Arrays;

import java.util.Properties;

/**

* @author tuyou

* @date 2023/1/11 17:08

*/

public class MyTest

public static void main(String[] args) throws SQLException

String tableName = "EXPENSE_GENERAL_ORDER";

OracleDataSource dataSource = new OracleDataSource();

dataSource.setUser("xxx");

dataSource.setPassword("xxx");

dataSource.setURL("jdbc:oracle:thin:@ip:1521/xxx");

final OracleConnection conn = (OracleConnection) dataSource.getConnection();

Properties prop = new Properties();

// prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");

// prop.setProperty(OracleConnection.DCN_NOTIFY_CHANGELAG, "1");

prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");

prop.setProperty(OracleConnection.NTF_QOS_PURGE_ON_NTFN, "true");

prop.setProperty(OracleConnection.NTF_TIMEOUT, "0");

final DatabaseChangeRegistration databaseChangeRegistration = conn.registerDatabaseChangeNotification(prop);

databaseChangeRegistration.addListener(new DatabaseChangeListener()

@Override

public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent)

long regId = databaseChangeEvent.getRegId();

System.out.println("change notify: " + Arrays.toString(databaseChangeEvent.getTableChangeDescription()));

if (regId == databaseChangeRegistration.getRegId())

TableChangeDescription[] tds = databaseChangeEvent.getTableChangeDescription();

System.out.println("'TableChangeDescription change count:" + tds.length);

for (TableChangeDescription td : tds)

System.out.println("table id: " + td.getObjectNumber());

System.out.println("table name: " + td.getTableName());

RowChangeDescription[] rds = td.getRowChangeDescription();

for (RowChangeDescription rd : rds)

System.out.println("row id: " + rd.getRowid().stringValue());

System.out.println("row change count: " + rd.getRowOperation().toString());

);

OracleStatement statement = (OracleStatement) conn.createStatement();

statement.setDatabaseChangeRegistration(databaseChangeRegistration);

ResultSet resultSet = statement.executeQuery("select * from " + tableName + " where 1=2");

statement.close();

System.err.println("started");

5 基于扫描表更新时间的增量同步方式

在数据库表更新时自动更新表的更新时间,然后通过定时任务扫描更新时间大于最后同步时间的数据,拿到更新的数据

优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;

缺点:1、oracle还是要建立触发器才能自动更新时间,也会存在触发器的缺点;2、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据

6 基于Oracle的闪回查询方式的增量同步方式

oracle的闪回查询可以查询最近一段时间的数据的增删改记录,利用这种特性也可以做增量同步

优点:1、实现原理相对简单;2、不用采用触发器

缺点:1、闪回查询有时间保留限制,如果应用由于停机原因可能造成部分数据变动没有同步;2、闪回查询数据量和实际表数据量相关,如果表数据量较大,查询会非常慢

闪回查询sql参考:

(SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss')

minus

SELECT F_DJBH FROM BF_BIZ_INFO)

union

(SELECT F_DJBH FROM BF_BIZ_INFO

minus

SELECT F_DJBH FROM BF_BIZ_INFO AS OF TIMESTAMP to_timestamp('2023-02-02 10:30:00', 'yyyy-mm-dd hh24:mi:ss'))

7 基于Oracle的SCN机制方式的增量同步方式

同步思想5类似,但是更新时间是基于oracle的SCN机制,利用oracle的隐藏的ora_rowscn列来发现更新的数据,参考sql:select ora_rowscn, F_DJBH from BF_BIZ_INFO where ora_rowscn >= '169639743457'

优点:1、实现相对简单;2、可以在从库上做扫描,不会影响主库压力;

缺点:1、对于删除的数据无法根据更新时间发现,只能用ES的数据和数据库的数据做全量对比才能发现删除数据或者建立一个delete触发器

8 其他ETL开源平台

DataX:采用写sql的扫描数据,不适合做线上数据同步,只适合离线数据同步。官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

同步案例: https://blog.csdn.net/weixin_42418589/article/details/126019261

Tapdata:开源版本并不支持Oracle,可以支持mysql

最终选择

基于对我们系统现状的分析,最终确定的方案选择是基于Oracle的SCN机制 + 触发器来实现增量同步数据到ES,不依赖开源框架,基于Oracle的SCN机制实现insert/update数据的增量同步,基于delete触发器实现delete数据的增量同步。具体同步逻辑如下图。

以上是关于ES增量同步方案的主要内容,如果未能解决你的问题,请参考以下文章

Canal——增量同步MySQL数据到ES

canal 系列:ES中nested嵌套类型同步

干货 | Debezium实现Mysql到Elasticsearch高效实时同步

MySQL到Elasticsearch实时同步构建数据检索服务的选型与思考

Elasticsearch的灾备和恢复

同步MySQL数据到ES神器mysqlmom介绍