数据库,增量同步和全量同步是啥?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据库,增量同步和全量同步是啥?相关的知识,希望对你有一定的参考价值。

增量同步和全量同步是数据库同步的两种方式。全量同步是一次性同步全部数据,增量同步则只同步两个数据库不同的部分。

数据库简介:

数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,它产生于距今六十多年前,随着信息技术和市场的发展,特别是二十世纪九十年代以后,数据管理不再仅仅是存储和管理数据,而转变成用户所需要的各种数据管理的方式。数据库有很多种类型,从最简单的存储有各种数据的表格到能够进行海量数据存储的大型数据库系统都在各个方面得到了广泛的应用。

参考技术A 全量备份是指对某一时间点上的所有数据进行全量备份,包括系统和所有数据。这种备份方式每次都需要对系统和所有数据进行一次全量备份。如上,如果两次备份之间数据没有任何变化,那么两次备份的数据是一样的。也就是说100GB的数据即使没有发生任何数据变化,也会多耗费100GB的存储空间去做备份。但这种备份方式最大的好处就是在恢复丢失数据时,只需要对一个完整的备份进行操作就能够恢复丢失数据,大大加快了系统或数据恢复的时间。
增量备份即在第一次全量备份的基础上,分别记录每次的变化。由于增量备份在备份前会判断数据是否发生变化,并仅记录每次变化情况,所以相较于其他两种备份方式它最大的好处在于其所需存储空间最少的(相同的变化情况下),备份速度最快的。当然在数据还原上来说,它的恢复时间是最长的,效率较低。恢复数据时,需要在第一次完备的基础上,整合每次的一个变化情况。
参考技术B 数据如果保留多份,就会存在一致性问题,就需要同步,同步分为两大类:全量和增量
2. 概述
数据如果要保留副本,要么同时写(就是多写),或者进行复制:异步写(即从主数据拷贝到副本);
同时写(多写),引出一个问题,写多少节点算成功(场景:分布式系统)?全部写成功才算成功,还是写大多数成功算成功,还是写指定几个节点算成功?
异步写的话,如果采用异步复制,那么实时性需要考量的话,就需要采用性能优先的架构。
3.同步方式
数据同步一般分为两种方式:全量和增量。
3.1 全量
全量,这个很好理解。就是每天定时或者周期性全量把数据从一个地方拷贝到另外一个地方;
全量的话,可以采用直接全部覆盖(使用“新”数据覆盖“旧”数据);或者走更新逻辑(覆盖前判断下,如果新旧不一致,就更新);
这里面有一个隐藏的问题:如果采用异步写,主数据物理删除了,怎么直接通过全量数据同步?这就需要借助一些中间操作日志文件,或者其他手段,把这些“看不到”的数据记录起来。
3.2 增量(类如有;坚果云网盘增量同步功能)
增量的基础是全量,就是你要使用某种方式先把全量数据拷贝过来,然后再采用增量方式同步更新。
增量的话,就是指抓取某个时刻(更新时间)或者检查点(checkpoint)以后的数据来同步,不是无规律的全量同步。这里引入一个关键性的前提:副本一端要记录或者知道(通过查询更新日志或者订阅更新)哪些更新了。

mongodb监听oplog 全量+增量同步

一、前言

前一个项目中,涉及到了一次数据迁移,这次迁移需要从mongodb迁移到另一个mongodb实例上,两个源的数据结构是不一样的。涉及到增量和全量数据迁移,整体迁移数据量在5亿左右。本篇即讲理论,也讲实战,往下看↓!

二、迁移思路

通常的增量和全量迁移,思路基本一致:

  1. 在开启全量的时候,开始增量监听,记录下增量的主键id
  2. 当全量执行结束的时候,从新跑一边记录的增量主键id的记录,根据getbyId查询一下最新的记录,再upsert到新库中。

    思路就是这么样的。

三、同步实战

全量同步

全量的操作是比较简单的,这里我们需要找到一个排序的键,然后一直从旧库不断的捞数据,改变数据个时候,再更新到新表中,我这里的主键是递增的,所以我会根据主键id来进行循环获取,开始的时候还会记录一下最大的主键,当我们执行到最大的主键的时候,全量就结束了。

我写了一个http接口,主动调用来全量同步数据。

    /**
     * 全量同步
     *
     * @return 是否成功
     */
    @RequestMapping("/fullData")
    public Boolean fullData() 
    	//获取主键最大值
        Query query = new Query();
        query.with(new Sort(Sort.Direction.DESC, "_id"));
        query.limit(1);
        UserCompleteCountDTO max = firstMongoTemplate.findOne(query, UserCompleteCountDTO.class);
        Integer maxUserId = max.getUserId();

        Integer step = 100;
        Integer beginId = 1;
        Integer totalCount = 0;
        while (true) 
            logger.info("beginId:", beginId);
            Criteria criteria = new Criteria().where("_id").gte(beginId);
            Query queryStep = new Query(criteria);
            queryStep.limit(step).with(new Sort(new Sort.Order(Sort.Direction.ASC, "_id")));
            List<UserCompleteCountDTO> historyDTOS = new ArrayList<>();
            try 
                historyDTOS = firstMongoTemplate.find(queryStep, UserCompleteCountDTO.class);
             catch (Exception e) 
                List<UserCompleteCountIdDTO> historyIdDTOS = new ArrayList<>();
                historyIdDTOS = firstMongoTemplate.find(queryStep, UserCompleteCountIdDTO.class);
                if (!CollectionUtils.isEmpty(historyIdDTOS)) 
                    for (UserCompleteCountIdDTO idDTO : historyIdDTOS) 
                        int userId = idDTO.getUserId();
                        try 
                            Criteria criteriaE = new Criteria().where("_id").is(userId);
                            Query queryE = new Query(criteriaE);
                            UserCompleteCountDTO one = firstMongoTemplate.findOne(queryE, UserCompleteCountDTO.class);
                            if (null != one) 
                                historyDTOS.add(one);
                            
                         catch (Exception e1) 
                            logger.error("全量查询失败:id:", userId);
                            errorIdMapper.insert(userId);
                        
                    
                
            
            totalCount = fullSync(historyDTOS, totalCount);
            //判断全量是否完成
            if ((CollectionUtils.isEmpty(historyDTOS) || historyDTOS.size() < step) && (beginId + step) >= maxUserId) 
                logger.info("全量同步结束!");
                break;
            
            UserCompleteCountDTO last = historyDTOS.get(historyDTOS.size() - 1);
            beginId = last.getUserId() + 1;

            try 
                Thread.sleep(5);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        return true;
    

 private Integer fullSync(List<UserCompleteCountDTO> list, Integer totalCount) 
        if (CollectionUtils.isEmpty(list)) 
            return totalCount;
        

        //同步数据库
        List<DataDTO> insertDataList = new ArrayList<>();
        for (UserCompleteCountDTO old : list) 
            List<DataDTO> dataDTOS = coverDataDTOList(old);
            //赋值
            insertDataList.addAll(dataDTOS);
        
        ExecutorService executor = Executors.newFixedThreadPool(20);

        try 
            if (!CollectionUtils.isEmpty(insertDataList)) 
                List<List<DataDTO>> partition = Lists.partition(insertDataList, 100);
                CountDownLatch countDownLatch = new CountDownLatch(partition.size());
                for (List<DataDTO> partList : partition) 
                    ImportTask task = new ImportTask(partList, countDownLatch);
                    executor.execute(task);
                
                countDownLatch.await();
                totalCount = totalCount + list.size();
            
            logger.info("totalCount:", totalCount);
         catch (Exception e) 
            logger.error("批量插入数据失败");
         finally 
            // 关闭线程池,释放资源
            executor.shutdown();
        

        return totalCount;
    


    class ImportTask implements Runnable 
        private List list;
        private CountDownLatch countDownLatch;

        public ImportTask(List data, CountDownLatch countDownLatch) 
            this.list = data;
            this.countDownLatch = countDownLatch;
        

        @Override
        public void run() 
            if (null != list) 
                // 业务逻辑,例如批量insert或者update
                BulkOperations operations = secondMongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, "xxxxx");
                operations.insert(list);
                BulkWriteResult result = operations.execute();
            
            // 发出线程任务完成的信号
            countDownLatch.countDown();
        
    

增量同步

增量同步需要我们来监听mongodb的日志:oplog。

什么是oplog?
oplog用于存储mongodb的增删改 和一些系统命令,查询的不会记录。类似于mysql的binlog日志。
mongodb的副本同步就是利用oplog进行同步的。主节点接收请求操作,然后记录在oplog中,副本节点异步复制这些操作。

oplog存在哪里?

oplog在local库:
master/slave 架构下
local.oplog.$main;
replica sets 架构下:
local.oplog.rs
sharding 架构下,mongos下不能查看oplog,可到每一片去看。

mongodb 监听代码:
我这里是把监听到的主键id记录到mysql中,当全量完成后,再开启增量同步,读取mysql中数据,同步到新表中

package com.soybean.data.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.BasicDBObject;
import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.soybean.data.mapper.IdMapper;
import com.soybean.data.util.MongoDBUtil;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Component
public class MongoDBOpLogService implements CommandLineRunner 

    private static final Logger logger = LoggerFactory.getLogger(MongoDBOpLogService.class);

    private static MongoClient mongoClient;

    @Autowired
    private IdMapper idMapper;

    /**
     * 服务启动记录增量数据到mysql
     * @param strings
     * @throws Exception
     */
    @Override
    public void run(String... strings) throws Exception 
        initMongoClient();
        //获取local库
        MongoDatabase database = getDatabase("local");
        //监控库oplog.$main
        MongoCollection<Document> runoob = getCollection(database, "oplog.rs");
        try 
            //处理
            dataProcessing(runoob);
         catch (Exception e) 
            logger.error("error:", e);
        

    


    private static void initMongoClient() 
        try 
            mongoClient = MongoDBUtil.initMongoHasUser();
         catch (IOException e) 
            e.printStackTrace();
        
    

    public static MongoDatabase getDatabase(String dataBase) 
        if (!mongoClient.getDatabaseNames().contains(dataBase)) 
            throw new RuntimeException(dataBase + " no exist !");
        
        MongoDatabase mongoDatabase = mongoClient.getDatabase(dataBase);
        return mongoDatabase;
    

    /**
     * 获取表对象
     *
     * @param mongoDatabase
     * @param testCollection
     * @return
     */
    public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) 
        MongoCollection<Document> collection = null;
        try 
            //获取数据库dataBase下的集合collecTion,如果没有将自动创建
            collection = mongoDatabase.getCollection(testCollection);
         catch (Exception e) 
            throw new RuntimeException("获取" + mongoDatabase.getName() + "数据库下的" + testCollection + "集合 failed !" + e);
        
        return collection;
    


    /**
     * 获取数据流处理标准化
     *
     * @param collection
     * @throws InterruptedException
     */
    public void dataProcessing(MongoCollection<Document> collection) throws InterruptedException 
        //-1倒叙,初始化程序时,取最新的ts时间戳,监听mongodb日志,进行过滤,这里的ts尽量做到,服务停止时,存储到文件或者库,获取最新下标

        FindIterable<Document> tsCursor = collection.find().sort(new BasicDBObject("$natural", -1)).limit(1);
        Document tsDoc = tsCursor.first();
        BsonTimestamp queryTs = (BsonTimestamp) tsDoc.get("ts");
        try 
            Integer index = 1;
            List<Integer> batchIds = new ArrayList<>();
            while (true) 
                BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs));
                MongoCursor docCursor = collection.find(query)
                        .cursorType(CursorType.TailableAwait) //没有数据时阻塞休眠
                        .noCursorTimeout(true) //防止服务器在不活动时间(10分钟)后使空闲的游标超时。
                        .oplogReplay(true) //结合query条件,获取增量数据,这个参数比较难懂,见:https://docs.mongodb.com/manual/reference/command/find/index.html
                        .maxAwaitTime(1, TimeUnit.SECONDS) //设置此操作在服务器上的最大等待执行时间
                        .iterator();

                while (docCursor.hasNext()) 
                    Document document = (Document) docCursor.next();
                    //更新查询时间戳
                    queryTs = (BsonTimestamp) document.get("ts");
                    String op = document.getString("op");
                    String database = document.getString("ns");
                    if (!"resourcebehaviorsystem.playCompleted".equalsIgnoreCase(database)) 
                        continue;
                    

                    Document context = (Document) document.get("o");
                    Document where = null;
                    Integer id = null;
                    if (op.equals("u")) 
                        where = (Document) document.get("o2");
                        id = Integer.valueOf(String.valueOf(where.get("_id")));
                        if (context != null) 
                            context = (Document) context.get("$set");
                        
                    

                    if (op.equals("i")) 
                        if (context != null) 
                            id = Integer.valueOf(String.valueOf(context.get("_id")));
                            context = (Document) context.get("$set");
                        
                    
                    logger.info("操作时间戳:" + queryTs.getTime());
                    logger.以上是关于数据库,增量同步和全量同步是啥?的主要内容,如果未能解决你的问题,请参考以下文章

mongodb监听oplog 全量+增量同步

mongodb监听oplog 全量+增量同步

DATAX如何增量同步数据

Redis主从之全量复制和增量复制

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi