如何优雅地用1小时处理1600万条数据库记录

Posted 纵横千里,捭阖四方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何优雅地用1小时处理1600万条数据库记录相关的知识,希望对你有一定的参考价值。

有些业务场景发生改变时经常需要先修复历史数据,例如最近双清单政策出来之后,要求对用户敏感数据都要加密,主要是姓名、手机号和地址三个字段。过度方案很简单,先为三个字段 分别创建一个对应的name_encrypt,phone_encrypt和address_encrypt,先将对应的数据都加密保存起来,上线稳定后再将明文字段name,phone和address删掉。现在有个服务有1600万条记录,每条记录都要先处理一遍,那此时该如何做比较好呢?我们通过进阶的方式来逐步展开。

第一级:多线程执行

这种修理数据的程序一般只使用一次,修复完了就可以关闭了,而执行的时候为了快,我们必然会选择多线程方式来进行。而内存是无法一次加载全部数据的,所以还需要使用分页的方式来进行,我是这么做的:

【1】分页设计

首先为了分页,我们需要知道数据表的最大Id和最小Id,需要先写两个查询接口getMaxRecordId()和getMinRecordId()对应的sql就是:

select max(id) from ***;
select min(id) from ***;

接下来我们要获得一个分页对应的所有Id,这里之所以用id,而不直接返回对应的Dto,是因为我想复用来处理其他的数据表,代码就是:

 List<Long> ids = iEncToolService.getUnEncRecordsByPage(start, STEP);
对应的sql就是:
String sql = "select id from ** where id>=  ? and  name_encrypt= '' order by id  limit ?";

上面的STEP就是一页的大小。

接下来就是根据Id一个个去处理了,这时候就要考虑多线程的设计了。这里我们先看最后处理完之后如何获得下一页的起始位置。

使用这行代码能准确获得下一页地址吗:

start=start+STEP

答案是不行,因为id编号可能存在跳跃的情况,例如某些记录删除了,或者分库分表等情况都会导致Id不连续,所以不能这么做,一种方式是从本轮处理的id中找到最大的那一个,然后加1作为下一轮的开始,也就是这样:

    private long getMaxId(long start, List<Long> ids) 
        long maxId = ids.stream().mapToLong(id -> id).max().getAsLong();
        return Math.max(start, maxId);
    

然后下一轮的start这么写就行了:

 start = getMaxId(start, ids) + 1;

假如我们是从DTO中找Id最大,上面的getMaxId()方法可以这么写:

    private long getMaxId(long start, List<InfoDto> ids) 
        long maxId = ids.stream().mapToLong(t -> t.getId()).max().getAsLong();
        return Math.max(start, maxId);
    

这样,我们就可以通过STEP的大小来控制每次读取的量,然后就一波一波的执行了。

【2】线程设计

要同时处理这么多数据,肯定是要使用多线程方式的,为此我建立了一个线程池:

 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, Integer.MAX_VALUE, TimeUnit.SECONDS, queue);

这里的队列大小经过几次测试发现影响不大,但是为了平稳执行,最好比分页STEP大20%以上比较好。

然后就是遍历id,依次放入到线程中。代码如下:

for (long id : ids) 
                waitToProcess(queue, id);
                executor.submit(() -> iEncToolService.updateRecord(id));
            

这里的waitToProcess()是我们人为增加的一个等待操作,没有业务,当队列里任务过多时,我们人为增加等待时间,实现如下:

private void waitToProcess(BlockingQueue<Runnable> queue, long userId) 

        while (queue.size() > 150) 
            try 
                TimeUnit.MILLISECONDS.sleep(20);
             catch (Exception e) 
                log.error(" update user points record error,userId:", userId, e);
            
        
    

这样一个简易的批处理功能就实现了,将数据表处理完之后就自动关闭了。完整代码如下:

private void unShardDataProcess(String[] strings) 
        long start = 0, endId;

        String dataTable = strings[1];

        endId = iEncToolService.getMaxRecordId(dataTable, 0);

        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, Integer.MAX_VALUE, TimeUnit.SECONDS, queue);

        while (start <= endId) 
            List<BatchEncInfoDto> dtos = iEncToolService.getUnShardUnEncRecordsByPage(dataTable, start, STEP);

            if (CollectionUtils.isEmpty(dtos)) 
                break;
            
            log.info("user ids:" + dtos);
            for (BatchEncInfoDto dto : dtos) 
                waitToProcess(queue, dto.getId());
                executor.submit(() -> iEncToolService.updateRecordForEnc(dataTable, dtos));
            

            log.info("sync update ,start Id=,len=", start, STEP);
            start = this.getMaxIdFromDto(start, dtos) + 1;
        
        executor.shutdown();
        try 
            executor.awaitTermination(10, TimeUnit.MINUTES);
         catch (Exception e) 
            log.error(" shutdown update executor error", e);
        

    

这个方法每秒能处理几百条记录,因此处理一个一二十万条记录规模的数据还是可以的。

第二级  数据库批处理

上面这个方法的一个问题是每处理一条记录都要进行一次线程切换,还要与数据库进行一次通信,数据量大时会消耗大量资源。这时候想提高处理效率,批处理是一个不错的选择。

我们每次从数据库中获得一个分页,例如50条,可以使用批处理方式将其拼接程一个batch操作,在一个线程里,与DB进行一次通信就完成。这个操作与你使用ORM框架不同会有所不同,基本思路是一样的,我这里的已经被公司的框架封装过,基本实现如下:

 public boolean updateRecordForEnc(List<BatchEncInfoDto> ids) 

        if (CollectionUtils.isEmpty(ids)) 
            log.info("info ids is null");
            return false;
        
        int shardId = IdUtils.parse(ids.get(0).getId()).getShard();

        List<Object[]> batch = new ArrayList<>();

        AESUtils aesEncrypt = aesEncryptHelper.getOrderEncrypt();

        String nameEncrypt = "", phoneEncrypt = "", addressEncrypt = "";
        for (BatchEncInfoDto dto : ids) 
            try 
                nameEncrypt = aesEncrypt.encrypt(dto.getName());
                phoneEncrypt = aesEncrypt.encrypt(dto.getPhone());
                addressEncrypt = aesEncrypt.encrypt(dto.getAddress());
             catch (GeneralCryptoException e) 
                log.info("enc error , dto:", e, dto);
            
            Object[] values = new Object[]
                    nameEncrypt,
                    phoneEncrypt,
                    addressEncrypt,
                    dto.getId()
            ;
            batch.add(values);

        
        String sql = "update *** set name_encrypt = ?, phone_encrypt = ?, address_encrypt = ? where id = ?";
        int[] updateCounts = batchUpdate(shardId, sql, batch);
        if (updateCounts.length != ids.size()) 
            log.info("expressAddress :", ids.toString());
            return false;
        
        return true;

    

这里的关键点是拼接values数组,最后由batchUpdate一次完成一页数据的更新。

这样做的效率明显比单条记录要高,理论上一次批操作越多,速度也快,但是一般一两百条一次比较好。

我使用的一页是50条,处理一个200W条记录的表大约需要半小时。

第三级 分库执行

如果是分库分表情况下,上面的操作仍然是一个库一个库操作的,假如分了8个库,那就需要接近5小时才能完成,速度还是太慢,那该怎么办呢?

我们为每个库分表建立线程,让多个库同时执行,这样的话速度自然会提高不少,大致代码如下:

 int shardSize = **Service.queryJdbcShardSize();
        final int len = 10000;
         
        long startTime = System.currentTimeMillis();

        for (int jdbcShard = 0; jdbcShard < shardSize; jdbcShard++) 
            int finalJdbcShard = jdbcShard;
            log.info("finalJdbcShard = ", jdbcShard);

            syncEsExecutor.execute(() -> 
                long minId = **Service.getShardMinId(finalJdbcShard);
                long maxId = **Service.getShardMaxId(finalJdbcShard);
                log.info(" jdbcShard=, minId=, maxId=", finalJdbcShard, minId, maxId);
 int shardSize = **Service.queryJdbcShardSize();
        final int len = 10000;
  
        long startTime = System.currentTimeMillis();

        for (int jdbcShard = 0; jdbcShard < shardSize; jdbcShard++) 
            int finalJdbcShard = jdbcShard;
            log.info("finalJdbcShard = ", jdbcShard);

            syncEsExecutor.execute(() -> 
                long minId = **Service.getShardMinId(finalJdbcShard);
                long maxId = **Service.getShardMaxId(finalJdbcShard);
                log.info("Sync one shard kefu message, jdbcShard=, minId=, maxId=", finalJdbcShard, minId, maxId);

                while (true) 
                   
                    List<**Message> shardPage = **Service.getByShardPage(finalJdbcShard, minId, len);

                    if (minId > maxId) 
                        log.info("Sync one page message end, jdbcShard=, minId=", finalJdbcShard, minId);
                        break;
                    

                    List<**IndexItem> messageIndexItems = **Service.***IndexItem(shardPage);
                    indexService.adds(messageIndexItems);

                    minId = shardPage.stream().mapToLong(Message::getId).max().getAsLong() + 1;
                    log.info("Sync one page message end, jdbcShard=, minId=", finalJdbcShard, minId);
                
            );
        

        try 
            syncEsExecutor.shutdown();
            boolean result = syncEsExecutor.awaitTermination(2, TimeUnit.DAYS);
                 
            ****
            );
        

       

大致代码如上所示。这样执行的话,一个1600万的数据,大约1小时就你能处理完。

以上是关于如何优雅地用1小时处理1600万条数据库记录的主要内容,如果未能解决你的问题,请参考以下文章

如何优雅地用Redis实现分布式锁?

如何优雅地用Redis实现分布式锁

如何优雅地用Redis实现分布式锁?

如何优雅地用Redis实现分布式锁?

mysql如何更快地插入数百万条记录? [关闭]

如何通过java程序更快地选择和插入百万条记录