技术干货 | 百万级商品数据实时同步,查询结果秒出

Posted 微微科技公司

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了技术干货 | 百万级商品数据实时同步,查询结果秒出相关的知识,希望对你有一定的参考价值。

前阵子老板安排了一个新任务,要建设一个商家商品搜索系统,能够为用户提供快速、准确的搜索能力,在用户输入搜索内容时,要能从商家名称和商品名称两个维度去搜索,搜索出来的结果,按照准确率排序,并按商家所属商品的关联关系,来组合数据结构,同时提供API给业务系统调用。

背景很简单,现实蛮复杂!我们面临以下几个难题:
①商家数据库和商品数据库是多台不同的服务器,并且数据量达百万级,如何才能实现跨数据库的数据同步呢?

②商家和商品的数据是有从属关系的,不然就会把肯德基的香辣鸡腿堡挂到麦当劳去,这就尴尬了!

③商家商品数据是经常更新的,比如修改价格、库存、上下架等,那搜索服务可不能搜出一堆过时的数据,如果客户明明搜出来的商品,点进去后却已下架了,那么客户就要吐槽了!如何实现搜索数据与源数据库增删改均实时同步呢?

带着以上3个问题,我们开始了搜索服务的整体架构设计。

系统架构设计思路
为了设计出合适的系统架构,我们分析了现状。
首先,商家数据和商品数据分别存储在2个独立的mysql8数据库,为满足商家数据和商品数据的关联,我们需要将两个库中所需要的表实时ETL到我们的搜索系统数据库。

其次,数据从商家、商品数据库ETL到搜索系统数据库后,需要实时的组合成为商家关联商品数据结构,并以父子文档的格式,存储到ES中。

最后,商家、商品数据库的增删改操作,需要实时的同步到ES中,也就是ES中的数据,需要支持实时的增加、删除和修改。

为此,我们设计了2个canal组件,第一个canal实现数据ETL,把商家、商品数据库的某些表及字段,抽取到搜索服务数据库;再利用第二个canal,读取搜索服务MySQL数据库的binlog,实时传输到kafka消息队列,再由canal adapter对数据进行关联、父子文档映射等,将处理好的数据存储到ElasticSearch中。

具体系统架构设计如下图所示。

商家商品搜索系统架构设计


项目实战

1、环境及软件说明

操作系统:CentOS 7
canal:canal.adapter-1.1.4,canal.deployer-1.1.4
kafka:kafka_2.12-2.3.0
ElasticSearch:elasticsearch-6.3.2
kibana:kibana-6.3.2

2、利用Canal实现数据ETL到MySQL8
这个步骤是利用canal从2个独立的MySQL8数据库中,抽取需要的表到搜索服务的MySQL数据库。

2.1 安装canaldeployer

(1)解压canal.deployer-1.1.4.tar.gz
(2)配置canal deployer
进入canaldeployer/conf目录,修改canal.properties文件,主要配置serverMode、MQ和destination三部分。
首先,我们serverMode修改为kafka模式,增加系统缓冲能力以及提高系统稳定性:

serverMode


接着,配置kafka的MQ信息(kafka请自行安装):

kafka MQ信息


最后,配置需要实例化的instance,这里配置了3个,表示canal deploy会启动这3个实例,同步MySQL的binlog到kafka的topic内。如下图所示:

destinations实例配置


(3)配置canal deployer instance
进入canaldeployer/conf/example目录,发现有一个instance.properties文件,这是canal给的示例,我们可以参考其配置。
①我们拷贝整个example目录,并重命名为上个步骤配置的destination之一,如xxxsearch;
②进入xxxsearch目录,编辑instance.properties文件,主要配置源数据库信息、所需数据表及字段,以及指定kafka的topic名,这样源数据库的binlog就会转换为json数据,并实时的通过canal deployer传输到kafka该topic中。如下所示:

canaldeploy instance 源数据库配置


canaldeploy instance kafka topic配置


③进入canaldeployer/bin目录,执行./startup.sh,启动canal deployer及所属实例。
至此canal deployer搭建完成。

2.2 安装canal.adapter

我们需要利用canal.adapter将kafka topic中的binlog json数据,经过清洗转换等操作,存储到MySQL8中。由于canal原生是不支持MySQL8的,故我们需要做一些调整。
(1)增加MySQL8连接驱动
解压canal.adapter-1.1.4.tar.gz,进入canaladapter/lib目录,移除mysql-connector-java-5.1.40.jar,导入mysql-connector-java-8.0.18.jar

(2)配置canal adapter,使数据输出到MySQL8。
进入canaladapter/conf目录,编辑application.yml文件,主要配置消费kafka、源数据库信息和搜索系统数据库信息,如下所示:

ETL到MySQL8配置


接着,进入canaladapter/conf/rdb目录,以官方提供的mytest_user.yml为例,配置kafka topic名、源数据库名、源数据表名,以及目标数据库名和目标数据表名,建议一张表对应一个yml文件

ETL表结构映射配置


(3)启动canaladapter
进入canaladapter/bin目录,执行./startup.sh,启动canal adapter,观察logs/adapter/adapter.log日志文件,手动在搜索系统数据库新增一条记录,看是否会打印如下日志,即有2条记录,一条INFO,一条DEBUG,则表示配置成功。
canaladapter日志
至此,数据ETL阶段搭建完成,数据可从两个不同的MySQL8数据库,实时同步到搜索服务的MySQL数据库。

3、实现数据多表关联、父子文档映射
(1)配置第二个canal的canaladapter
进入canaladapter/conf目录,编辑application.yml文件,主要配置消费kafka、搜索系统数据库,和ES连接信息,如下所示:

canaladapter MQ及mysql配置


canaladapter ES配置


(2)配置多表关联
进入canaladapter/conf/es目录,vim mytest_user.yml,编辑多表关联配置:

多表关联配置

注意,sql支持多表关联自由组合, 但是有一定的限制:
(a)主表不能为子查询语句
(b)只能使用left outer join即最左表一定要是主表
(c)关联从表如果是子查询不能有多张表
(d)主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
(e)关联条件只允许主外键的\'=\'操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
(f)关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id  其中的 a.role_id 或者 b.id 必须出现在主select语句中
(g)Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.
(3)配置父子文档
以官方的biz_order.yml为例,vim biz_order.yml,配置父子文档映射:

配置父子文档映射


(4)在ElasticSearch6中,建立index和父子文档映射关系
进入kibana页面,点击Dev Tools,执行如下命令,即可建立索引及父子文档映射:
建立index和父子文档映射
其中,ES6和kibana的安装,在此无特别配置,不做赘述。

(5)启动canal adapter
进入canaladapter/bin目录,执行./startup.sh,启动canal adapter,观察logs/adapter/adapter.log日志文件,手动在搜索系统数据库新增一条记录,看是否会打印如下日志,如打印则表示配置成功。
正确配置adapter日志示例

4、运行结果
现在,我们可以通过kibana来执行DSL语句来查询看看。
我们事先已在商家系统中增加了一个“肯德基”商店,然后在商品系统中添加了“西红柿”和”新鲜西红柿“2个商品,并将商品关联到“肯德基”上。接着我们查询”肯德基“或者“西红柿”,得到以下是查询的结果(去除了ES默认字段):
通过DSL查询的结果
由图可见,我们可以通过商家名查询商品,也可通过商品名查询商店和商品,并且canal支持数据的实时增删改,所以ES的数据也会与商家系统和商品系统保持一致,同时数据结构包含商家及对应的商品,满足业务需求。

5、总结
至此,基于Canal、kafka、MySQL8、ElasticSearch6技术的商家商品搜索系统基础框架搭建完成。我们采用canal deployer实时读取商家、商品系统的MySQL数据库binlog,并发送至kafka,接着由canal adapter消费kafka,并将binlog json数据进行多表关联、父子文档映射,最后存储到ES6中,供上层搜索服务调用。
搜索服务系统最终成功上线,为公司百万级商家商品提供实时数据同步,秒级搜索结果展示,达到业务要求,老板说了,给研发团队每人加个鸡腿!想想还有点小激动,嘿嘿~~

性能爆表:SpringBoot利用ThreadPoolTaskExecutor批量插入百万级数据实测!

点击关注公众号,实用技术文章及时了解

前言

开发目的:

提高百万级数据插入效率。

采取方案:

利用ThreadPoolTaskExecutor多线程批量插入。

采用技术:

  • springboot2.1.1

  • mybatisPlus3.0.6

  • swagger2.5.0

  • Lombok1.18.4

  • postgresql

  • ThreadPoolTaskExecutor

具体实现细节

application-dev.properties添加线程池配置信息

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 30
# 配置最大线程数
async.executor.thread.max_pool_size = 30
# 配置队列大小
async.executor.thread.queue_capacity = 99988
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-importDB-

spring容器注入线程池bean对象

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig 
    @Value("$async.executor.thread.core_pool_size")
    private int corePoolSize;
    @Value("$async.executor.thread.max_pool_size")
    private int maxPoolSize;
    @Value("$async.executor.thread.queue_capacity")
    private int queueCapacity;
    @Value("$async.executor.thread.name.prefix")
    private String namePrefix;
 
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() 
        log.warn("start asyncServiceExecutor");
        //在这里修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    

创建异步线程 业务类

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService 
@Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<LogOutputResult> logOutputResults, LogOutputResultMapper logOutputResultMapper, CountDownLatch countDownLatch) 
        try
            log.warn("start executeAsync");
            //异步线程要做的事情
            logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
            log.warn("end executeAsync");
        finally 
            countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
        
    

创建多线程批量插入具体业务方法

@Override
public int testMultiThread() 
    List<LogOutputResult> logOutputResults = getTestData();
    //测试每100条数据插入开一个线程
    List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100);
    CountDownLatch countDownLatch = new CountDownLatch(lists.size());
    for (List<LogOutputResult> listSub:lists) 
        asyncService.executeAsync(listSub, logOutputResultMapper,countDownLatch);
    
    try 
        countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
        // 这样就可以在下面拿到所有线程执行完的集合结果
     catch (Exception e) 
        log.error("阻塞异常:"+e.getMessage());
    
    return logOutputResults.size();

模拟2000003 条数据进行测试

多线程 测试 2000003  耗时如下:耗时1.67分钟

本次开启30个线程,截图如下:

单线程测试2000003  耗时如下:耗时5.75分钟

检查多线程入库的数据,检查是否存在重复入库的问题:

根据id分组,查看是否有id重复的数据,通过sql语句检查,没有发现重复入库的问题

检查数据完整性:

通过sql语句查询,多线程录入数据完整

测试结果

不同线程数测试:

总结

通过以上测试案列,同样是导入2000003  条数据,多线程耗时1.67分钟,单线程耗时5.75分钟。通过对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:

CPU核心数量*2 +2 个线程。

附:测试电脑配置

来源:azdebug.blog.csdn.net/article/

details/103697108

推荐
Java面试题宝典
技术内卷群,一起来学习!!


PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

以上是关于技术干货 | 百万级商品数据实时同步,查询结果秒出的主要内容,如果未能解决你的问题,请参考以下文章

MySQL百万级数据表or查询优化

性能爆表:SpringBoot利用ThreadPoolTaskExecutor批量插入百万级数据实测!

MySQL 百万级数据,怎么做分页查询?

MySQL百万级千万级数据多表关联SQL语句调优

MySQL百万级千万级数据多表关联SQL语句调优

技术流MySQL百万级数据库查询优化技巧