亿级数据迁移的可靠性设计

Posted 金融科技技术与架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了亿级数据迁移的可靠性设计相关的知识,希望对你有一定的参考价值。

亿级数据迁移的可靠性设计

背景

会员等级权益系统作为互联网产品会员体系的一个重要系统,帮助运营更加直观的获取用户的数据,进行精准营销。对于会员等级权益系统的搭建有些产品从诞生开始并伴随而生,而对于很多老牌互联网产品,则是在随着产品的演进,业务的需要,在后期扩展进去的。SN金融作为一款老牌的互联网金融产品,其会员等级权益系统也是17年底才补充进去的,总体设计上是通过大数据捞取会员的各项活动数据,汇总分类,再乘上权重,得到会员活动积分,然后依此积分对会员进行分级,并确定会员权益。

设计

由于SN金融拥有亿级的用户量,每个用户定期内会产生多条活动数据,通过大数据部门拉取用户活动数据进行汇总后产生的数据量也是惊人的,如此天量的数据从大数据部门产生后交付到业务部门来使用,如果保证数据迁移的可靠性呢?总体的设计思路分为以下三步:

1,大数据部门拉取汇总的数据通过数据文件的方式交付给业务部门,文件分FLGCSV两种文件,上传FTP服务器

2,业务部门通过定时任务扫描FTP目录内的FLG文件确定该批次数据任务,然后逐一获取所有CSV文件的文件名称,将文件名存入数据库,作为数据迁移的任务列表

3,业务部门通过定时任务扫描数据库中的任务列表,逐个将CSV文件获取解析并上传到业务数据库中


针对以上设计具体内容还包括:

1,大数据部门统计汇总数据是按时间段进行统计的,比如一个月统计一次,然后产生数亿的数据,这些数据不可能存储到一个CSV文件中,会被分别存到数千个CSV文件中

2,如果每次统计的数据文件都放到一个目录内,随着时间的推移,文件会越来越多,势必影响性能和排错。当业务部门完成数据解析后,CSVFLG文件会被移动到备份目录

3,定时任务是不允许放在中台服务端的,在设计上定时任务都是放在后台,然后通过服务端API来完成和业务数据库的交互

4,一次解析数千张数据文件,是每个定时任务解析一个文件还是,一个定时任务解析全部的文件呢?这个要考虑到业务的增长和实时监控,随着数据量的增加,每次需要解析的文件数量是不一样的,如果每个文件一个定时任务需要花费很大的人工进行维护定时任务,而且定时任务太多不利于监控,综合来讲必须采用一个定时任务解析新产生的全量数据文件

5,如果定时任务中断了,重新启动解析任务,对于已经解析完整的数据文件不用再重新解析了,但是对于还没有解析上传完整的数据文件需要重新解析上传,而对于该文件中一部分已经解析上传的数据该怎么处理?此时需要将数据上传接口设置为幂等接口。


以上设计一个定时任务解析所有文件,并上传所有数据,需要考虑一下问题:

1,如何控制上传的速度?如果速度过快会不会压垮服务端数据库,影响到其他业务功能。

2,如果数据文件中的数据格式超过预定义格式怎么处理?需要保证单个数据文件异常不影响其他数据文件的解析上传。

3,如果某单条数据异常该怎么处理?需要保证单条数据的异常不影响其他数据的解析上传。

为了完成以上目标,该定时任务内部逻辑采用生产者消费者模式设计

1,生产者A从库中拉取待处理任务,将任务放到任务队列B

2,消费者C从任务队列B中获取任务,然后到FTP中获取文件,并解析,然后将解析的单条数据放到数据队列D

3,消费者E从数据队列D中获取数据,然后调用服务端上传数据接口,将数据上传到业务数据库中

代码

以下给出核心代码,包括一个线程协调服务,和两个生产者消费者线程代码

1,线程协调服务代码:

publicclass DataFileProcessimplements Observer {

    privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(DataFileProcess.class);

    /**

     * 文件数据处理可配想

     */

    private DataFileProcessConfig config;

 

    /**

     * 文件队列

     */

    privateArrayBlockingQueue<String> fileQueue;

 

    /**

     * 数据队列

     */

    privateArrayBlockingQueue<ProcessData> dataQueue;

 

    /**

     * 文件处理线程

     */

    private FileProcessRunable fileProcessRunable;

 

    /**

     * 数据处理线程

     */

    private DataProcessRunable dataProcessRunable;

 

    publicDataFileProcess(DataFileProcessConfig config) {

        this.config = config;

        this.fileQueue = new ArrayBlockingQueue<String>(100);

        this.dataQueue = new ArrayBlockingQueue<ProcessData>(10);

        this.fileProcessRunable = new FileProcessRunable(this.config, fileQueue, dataQueue);

        this.dataProcessRunable = new DataProcessRunable(this.config, dataQueue);

    }

 

    /**

     * 功能描述: 启动任务<br>

     * 〈功能详细描述〉

     *

     * @see [相关类/方法](可选)

     * @since [产品/模块版本](可选)

     */

    publicvoid process() {

        // 启动文件解析线程

        new Thread(fileProcessRunable).start();

        LOGGER.info("启动文件处理线程,文件处理类型{}-{}", config.getFileType().getType(), config.getFileType().getDescription());

 

        // 启动数据上传线程

        dataProcessRunable.addObserver(this);

        new Thread(dataProcessRunable).start();

        LOGGER.info("启动数据处理线程,文件处理类型{}-{}", config.getFileType().getType(), config.getFileType().getDescription());

 

        // 调用服务端获取所有需要处理的文件名,并加入待处理文件队列

        // 该函数位置勿动

        getFileNames();

    }

 

    /**

     * 功能描述: 获取所有待处理文件名,并加入队列<br>

     */

    protectedvoid getFileNames() {

        FmgsServiceIntegration fmgsIntegration =(FmgsServiceIntegration) SpringContextUtils

                .getBean(FmgsServiceIntegration.class);

        List<String> fileList = fmgsIntegration.getFileList(config.getFileType().getType());

        if (fileList == null) {

            return;

        }

        LOGGER.info("从服务端获取到文件数量为{}", fileList.size());

        for (String fileName : fileList) {

            try {

                fileQueue.put(fileName);

            } catch (InterruptedException e) {

                LOGGER.error("文件队列插入数据错误", e);

            } catch (Exception e) {

                LOGGER.error("系统异常", e);

            }

        }

    }

 

    /**

     * 功能描述: 监听数据处理线程,如果线程长时间(3秒)无法从队列获取数据,会触发该监听事件<br>

     * 监听事件检查两个队列是否同时处于空置状态,如果两次检查两个队列都同时处于空置状态,则停止所有线程,任务结束<br>

     */

    @Override

    publicvoid update(Observable o, Object arg) {

        try {

            // 如果两个队列都为空,则休眠3秒钟再重新检测两个队列,如果还为空,则停止整个任务

            if (fileQueue.size() < 1 && dataQueue.size() < 1) {

                Thread.sleep(3000);

                if (fileQueue.size() < 1 && dataQueue.size() < 1) {

                    config.setThreadRunStatus(false);

                    LOGGER.info("关闭线程,文件数据处理结束...");

                }

            }

        } catch (InterruptedException e) {

            LOGGER.error("系统异常", e);

        }

    }

}

2,任务生产者线程

publicclass FileProcessRunable implements Runnable {

    privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(FileProcessRunable.class);

    privateArrayBlockingQueue<String> fileQueue;

    privateArrayBlockingQueue<ProcessData> dataQueue;

    private DataFileProcessConfig config = null;

    private FTPClient ftpClient = null;

 

    publicFileProcessRunable(DataFileProcessConfig config, ArrayBlockingQueue<String> fileQueue,

           ArrayBlockingQueue<ProcessData> dataQueue) {

        this.fileQueue = fileQueue;

        this.dataQueue = dataQueue;

        this.config = config;

        this.ftpClient = new FTPClient();

    }

 

    @Override

    publicvoid run() {

        while (config.getThreadRunStatus() && config.getThreadRunStatusScm()) {

            try {

                // 1,获取数据文件名,如果长时间(3秒)未获取到文件名,则终止获取并通知监听检查是否终止线程

                String fileName = null;

                try {

                    fileName = fileQueue.poll(3, TimeUnit.SECONDS);

                } catch (InterruptedException e) {

                    LOGGER.error("从文件队里中获取数据失败", e);

                }

                if (StringUtils.isEmpty(fileName)) {

                    LOGGER.info("超过3秒未获取到数据文件名");

                    continue;

                }

 

                // 2,获取ftp中的文件内部数据

                List<String> readFile = null;

                try {

                    readFile = readFile(fileName);

                } catch (Exception e) {

                    LOGGER.error("ftp获取文件{}失败", fileName, e);

                    continue;

                }

                if (CollectionUtils.isEmpty(readFile)) {

                    LOGGER.info("从文件{}中获取到0条数据", fileName);

                    continue;

                }

                LOGGER.info("从文件{}中获取到{}条数据", fileName, readFile.size());

                for (inti = 0; i < readFile.size(); i++) {

                    ProcessData data = new ProcessData();

                    data.setFileName(fileName);

                    data.setRowDataStr(readFile.get(i));

                    if (i == readFile.size() - 1) {

                        data.setLastData(true);

                    } else {

                        data.setLastData(false);

                    }

                    // 3,数据加入待处理队列

                    try {

                        dataQueue.put(data);

                    } catch(InterruptedException e) {

                        LOGGER.error("向数据队里中插入数据{}失败", JSON.toJSON(data), e);

                    }

                }

                // 4,文件移到已处理文件夹

                booleanmoveProcessFile = false;

                try {

                    moveProcessFile = moveProcessFile(fileName);

                } catch (IOException e) {

                    LOGGER.error("文件迁移异常", e);

                }

                LOGGER.info("文件{}位置迁移{}", fileName, moveProcessFile ? "成功" : "失败");

            } catch (Exception e) {

                LOGGER.error("系统异常", e);

            }

        }

    }

 

    /**

     * 功能描述: ftp中获取文件内容<br>

     */

    private List<String>readFile(String fileName) throws IOException {

        // 登录ftp

        ftpLoginIn();

 

        FtpConfigInfoResponse ftpConfigResult = config.getFtpInfo();

        List<String> listData = newArrayList<String>();

        ftpClient.changeWorkingDirectory(ftpConfigResult.getFtpAbsolutePath()+ "toProcess/");

 

        // 读取文件数据

        InputStream input = ftpClient.retrieveFileStream(fileName);

        CsvReader csvReader = new CsvReader(input, Charset.forName("utf-8"));

 

        while (csvReader.readRecord()) {

            String record = csvReader.getRawRecord();

            listData.add(record);

        }

        csvReader.close();

        input.close();

        ftpClient.completePendingCommand();

 

        // 登出ftp

        ftpLoginOut();

 

        returnlistData;

    }

 

    /**

     * 功能描述: ftp文件切换到已处理目录<br>

     */

    privateboolean moveProcessFile(String fileName) throws IOException {

        // 登录ftp

        ftpLoginIn();

 

        FtpConfigInfoResponse ftpConfig = config.getFtpInfo();

        String origPath = ftpConfig.getFtpAbsolutePath().concat(ftpConfig.getToProcess()).concat("/").concat(fileName);

        String destPath = ftpConfig.getFtpAbsolutePath().concat(ftpConfig.getProcessEd()).concat("/").concat(fileName);

        booleanmoveResult = ftpClient.rename(origPath, destPath);

 

        // 登出ftp

        ftpLoginOut();

        returnmoveResult;

    }

 

    /**

     * 功能描述: 登录ftp<br>

     */

    privatevoid ftpLoginIn() throws NumberFormatException,SocketException, IOException {

        if (ftpClient.isConnected()) {

            return;

        }

        FtpConfigInfoResponse ftpConfigResult = config.getFtpInfo();

        ftpClient.connect(ftpConfigResult.getFtpIp(),Integer.parseInt(ftpConfigResult.getFtpPort()));

        booleanloginResult = ftpClient.login(ftpConfigResult.getFtpUserName(), ftpConfigResult.getFtpPassword());

        if (!loginResult) {

            thrownew AppException(ErrorCodeEnum.SYSTEM_ERROR, ftpConfigResult.getFtpIp()+ "打开失败");

        }

    }

 

    /**

     * 功能描述: 登出ftp<br>

     */

    privatevoid ftpLoginOut() {

        try {

            if (ftpClient.isConnected()) {

                ftpClient.logout();

                ftpClient.disconnect();

            }

        } catch (IOException e) {

            LOGGER.error("ftp关闭失败", e);

        }

    }

}

3,数据生产者线程(任务消费者)

publicclass DataProcessRunable extends Observable implements Runnable {

    privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(DataProcessRunable.class);

    private DataFileProcessConfig config;

    ArrayBlockingQueue<ProcessData> dataQueue;

    public DataProcessRunable(DataFileProcessConfig config, ArrayBlockingQueue<ProcessData> dataQueue) {

        this.config = config;

        this.dataQueue = dataQueue;

    }

 

    @Override

    publicvoid run() {

        while (config.getThreadRunStatus() && config.getThreadRunStatusScm()) {

            try {

                // 1,获取待处理数据,如果长时间(3秒)未获取到文件名,则终止获取并通知监听检查是否终止线程

                ProcessData data = null;

                try {

                    data = dataQueue.poll(3, TimeUnit.SECONDS);

                } catch (InterruptedException e) {

                    LOGGER.error("从数据队里中获取数据失败", e);

                }

                if (data == null) {

                    LOGGER.info("超过3秒未获取到待处理数据");

                    setChanged();

                    notifyObservers();

                    continue;

                }

                try {

                    // 2, 处理获取

                    config.getDataProcessService().service(data);

                } catch (AppException e) {

                    LOGGER.error(e.getMessage(),e);

                }

                // 3,每条数据处理后休眠时间

                try {

                    Thread.sleep(config.getPreDataSleepTime());

                } catch (InterruptedException e) {

                    LOGGER.error("系统异常", e);

                }

            } catch (Exception e) {

                LOGGER.error("系统异常", e);

            }

        }

    }

}


以上是关于亿级数据迁移的可靠性设计的主要内容,如果未能解决你的问题,请参考以下文章

MYSQL 到MYSQL 分表数据迁移

MYSQL 到MYSQL 分表数据迁移

大体量数据迁移思路

取代数仓!就在刚刚,公司宣布1年内完成向数据中台的迁移…

取代数仓!就在刚刚,公司宣布1年内完成向数据中台的迁移…

数据迁移 - 如何快速迁移