如何优雅地用1小时处理1600万条数据库记录
Posted 纵横千里,捭阖四方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何优雅地用1小时处理1600万条数据库记录相关的知识,希望对你有一定的参考价值。
有些业务场景发生改变时经常需要先修复历史数据,例如最近双清单政策出来之后,要求对用户敏感数据都要加密,主要是姓名、手机号和地址三个字段。过度方案很简单,先为三个字段 分别创建一个对应的name_encrypt,phone_encrypt和address_encrypt,先将对应的数据都加密保存起来,上线稳定后再将明文字段name,phone和address删掉。现在有个服务有1600万条记录,每条记录都要先处理一遍,那此时该如何做比较好呢?我们通过进阶的方式来逐步展开。
第一级:多线程执行
这种修理数据的程序一般只使用一次,修复完了就可以关闭了,而执行的时候为了快,我们必然会选择多线程方式来进行。而内存是无法一次加载全部数据的,所以还需要使用分页的方式来进行,我是这么做的:
【1】分页设计
首先为了分页,我们需要知道数据表的最大Id和最小Id,需要先写两个查询接口getMaxRecordId()和getMinRecordId()对应的sql就是:
select max(id) from ***;
select min(id) from ***;
接下来我们要获得一个分页对应的所有Id,这里之所以用id,而不直接返回对应的Dto,是因为我想复用来处理其他的数据表,代码就是:
List<Long> ids = iEncToolService.getUnEncRecordsByPage(start, STEP);
对应的sql就是:
String sql = "select id from ** where id>= ? and name_encrypt= '' order by id limit ?";
上面的STEP就是一页的大小。
接下来就是根据Id一个个去处理了,这时候就要考虑多线程的设计了。这里我们先看最后处理完之后如何获得下一页的起始位置。
使用这行代码能准确获得下一页地址吗:
start=start+STEP
答案是不行,因为id编号可能存在跳跃的情况,例如某些记录删除了,或者分库分表等情况都会导致Id不连续,所以不能这么做,一种方式是从本轮处理的id中找到最大的那一个,然后加1作为下一轮的开始,也就是这样:
private long getMaxId(long start, List<Long> ids)
long maxId = ids.stream().mapToLong(id -> id).max().getAsLong();
return Math.max(start, maxId);
然后下一轮的start这么写就行了:
start = getMaxId(start, ids) + 1;
假如我们是从DTO中找Id最大,上面的getMaxId()方法可以这么写:
private long getMaxId(long start, List<InfoDto> ids)
long maxId = ids.stream().mapToLong(t -> t.getId()).max().getAsLong();
return Math.max(start, maxId);
这样,我们就可以通过STEP的大小来控制每次读取的量,然后就一波一波的执行了。
【2】线程设计
要同时处理这么多数据,肯定是要使用多线程方式的,为此我建立了一个线程池:
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
这里的队列大小经过几次测试发现影响不大,但是为了平稳执行,最好比分页STEP大20%以上比较好。
然后就是遍历id,依次放入到线程中。代码如下:
for (long id : ids)
waitToProcess(queue, id);
executor.submit(() -> iEncToolService.updateRecord(id));
这里的waitToProcess()是我们人为增加的一个等待操作,没有业务,当队列里任务过多时,我们人为增加等待时间,实现如下:
private void waitToProcess(BlockingQueue<Runnable> queue, long userId)
while (queue.size() > 150)
try
TimeUnit.MILLISECONDS.sleep(20);
catch (Exception e)
log.error(" update user points record error,userId:", userId, e);
这样一个简易的批处理功能就实现了,将数据表处理完之后就自动关闭了。完整代码如下:
private void unShardDataProcess(String[] strings)
long start = 0, endId;
String dataTable = strings[1];
endId = iEncToolService.getMaxRecordId(dataTable, 0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
while (start <= endId)
List<BatchEncInfoDto> dtos = iEncToolService.getUnShardUnEncRecordsByPage(dataTable, start, STEP);
if (CollectionUtils.isEmpty(dtos))
break;
log.info("user ids:" + dtos);
for (BatchEncInfoDto dto : dtos)
waitToProcess(queue, dto.getId());
executor.submit(() -> iEncToolService.updateRecordForEnc(dataTable, dtos));
log.info("sync update ,start Id=,len=", start, STEP);
start = this.getMaxIdFromDto(start, dtos) + 1;
executor.shutdown();
try
executor.awaitTermination(10, TimeUnit.MINUTES);
catch (Exception e)
log.error(" shutdown update executor error", e);
这个方法每秒能处理几百条记录,因此处理一个一二十万条记录规模的数据还是可以的。
第二级 数据库批处理
上面这个方法的一个问题是每处理一条记录都要进行一次线程切换,还要与数据库进行一次通信,数据量大时会消耗大量资源。这时候想提高处理效率,批处理是一个不错的选择。
我们每次从数据库中获得一个分页,例如50条,可以使用批处理方式将其拼接程一个batch操作,在一个线程里,与DB进行一次通信就完成。这个操作与你使用ORM框架不同会有所不同,基本思路是一样的,我这里的已经被公司的框架封装过,基本实现如下:
public boolean updateRecordForEnc(List<BatchEncInfoDto> ids)
if (CollectionUtils.isEmpty(ids))
log.info("info ids is null");
return false;
int shardId = IdUtils.parse(ids.get(0).getId()).getShard();
List<Object[]> batch = new ArrayList<>();
AESUtils aesEncrypt = aesEncryptHelper.getOrderEncrypt();
String nameEncrypt = "", phoneEncrypt = "", addressEncrypt = "";
for (BatchEncInfoDto dto : ids)
try
nameEncrypt = aesEncrypt.encrypt(dto.getName());
phoneEncrypt = aesEncrypt.encrypt(dto.getPhone());
addressEncrypt = aesEncrypt.encrypt(dto.getAddress());
catch (GeneralCryptoException e)
log.info("enc error , dto:", e, dto);
Object[] values = new Object[]
nameEncrypt,
phoneEncrypt,
addressEncrypt,
dto.getId()
;
batch.add(values);
String sql = "update *** set name_encrypt = ?, phone_encrypt = ?, address_encrypt = ? where id = ?";
int[] updateCounts = batchUpdate(shardId, sql, batch);
if (updateCounts.length != ids.size())
log.info("expressAddress :", ids.toString());
return false;
return true;
这里的关键点是拼接values数组,最后由batchUpdate一次完成一页数据的更新。
这样做的效率明显比单条记录要高,理论上一次批操作越多,速度也快,但是一般一两百条一次比较好。
我使用的一页是50条,处理一个200W条记录的表大约需要半小时。
第三级 分库执行
如果是分库分表情况下,上面的操作仍然是一个库一个库操作的,假如分了8个库,那就需要接近5小时才能完成,速度还是太慢,那该怎么办呢?
我们为每个库分表建立线程,让多个库同时执行,这样的话速度自然会提高不少,大致代码如下:
int shardSize = **Service.queryJdbcShardSize();
final int len = 10000;
long startTime = System.currentTimeMillis();
for (int jdbcShard = 0; jdbcShard < shardSize; jdbcShard++)
int finalJdbcShard = jdbcShard;
log.info("finalJdbcShard = ", jdbcShard);
syncEsExecutor.execute(() ->
long minId = **Service.getShardMinId(finalJdbcShard);
long maxId = **Service.getShardMaxId(finalJdbcShard);
log.info(" jdbcShard=, minId=, maxId=", finalJdbcShard, minId, maxId);
int shardSize = **Service.queryJdbcShardSize();
final int len = 10000;
long startTime = System.currentTimeMillis();
for (int jdbcShard = 0; jdbcShard < shardSize; jdbcShard++)
int finalJdbcShard = jdbcShard;
log.info("finalJdbcShard = ", jdbcShard);
syncEsExecutor.execute(() ->
long minId = **Service.getShardMinId(finalJdbcShard);
long maxId = **Service.getShardMaxId(finalJdbcShard);
log.info("Sync one shard kefu message, jdbcShard=, minId=, maxId=", finalJdbcShard, minId, maxId);
while (true)
List<**Message> shardPage = **Service.getByShardPage(finalJdbcShard, minId, len);
if (minId > maxId)
log.info("Sync one page message end, jdbcShard=, minId=", finalJdbcShard, minId);
break;
List<**IndexItem> messageIndexItems = **Service.***IndexItem(shardPage);
indexService.adds(messageIndexItems);
minId = shardPage.stream().mapToLong(Message::getId).max().getAsLong() + 1;
log.info("Sync one page message end, jdbcShard=, minId=", finalJdbcShard, minId);
);
try
syncEsExecutor.shutdown();
boolean result = syncEsExecutor.awaitTermination(2, TimeUnit.DAYS);
****
);
大致代码如上所示。这样执行的话,一个1600万的数据,大约1小时就你能处理完。
以上是关于如何优雅地用1小时处理1600万条数据库记录的主要内容,如果未能解决你的问题,请参考以下文章