基于anaconda3的引擎供容器调用本地化实践

Posted Faylinn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于anaconda3的引擎供容器调用本地化实践相关的知识,希望对你有一定的参考价值。

背景: 现有基于anaconda3开发的引擎脚本,用于动态识别处理数据;

场景: 现在这个引擎是服务化,通过网络请求来调用; 我们的调用端是一个容器化的服务,因为引擎的入参出参数据量大,所以现在的服务化方案需要修改,本地化此引擎来减少网络传输的时间,达到优化的目的;

方案思考:

方案一: 将引擎集成到服务镜像中,这样可以本地调用;

方案二: 将引擎打成单独的镜像,放到容器的宿主机上,将存储共同挂载到相同目录,保证容器与引擎容器共享存储,再通过flask暴露接口来启动这个容器;

方案结论:
  • 舍弃了方案一,因为引擎太大了,单独的镜像达到快3G,若集成到服务镜像中,会导致服务镜像过大;且在相同容器中运行,并发性能也有一定的瓶颈(对于服务容器有资源限制);

  • 在方案二中更加的灵活,可以将引擎本地化模块独立开来;因为容器化,所以对于数据处理效率更高,并发性能也更好(可以同时运行多个容器);

实践:

  1. 首先我们需要将引擎打成镜像

    FROM continuumio/anaconda3:latest
    
    COPY /guanlian_v4.0_for_engine_local /engine
    
    ENTRYPOINT ["python", "/engine/process_bm_local.py"]
    
    • 首先基于anaconda3作为基础镜像;

    • 然后将引擎脚本复制到容器中;

    • 使用ENTRYPOINT主要是为了传参,因为我们需要将入参文件的路径传进去,引擎才能知道位置来进行处理;而在引擎运行处理完毕后就会关闭当前容器。

    通过命令打成镜像

    docker build -t engine:v1 .
    
  2. 启动容器进行功能测试

    docker run -v /home/test:/engine_data -it engine:v1 /engine_data/in.txt
    

    查看log,引擎运行正常,最后输出结果:

    finish  in.txt  in.txt_guanlian.log  in.txt_out.json
    

    验证得知容器运行OK

  3. 建立一个接口,来执行docker启动命令,我们使用flask(简单)

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: yxhe
    # Date: 2021/6/9/0009 21:38
    # ----------------------------------------------------------
    
    import os
    import threading
    
    from flask import Flask, request
    
    app = Flask(__name__)
    
    
    @app.route(\'/engine_gl/execute\', methods=[\'POST\'])
    def translate():
        folder = request.args.get("folder")
        file_name = request.args.get("iname")
        base_work_dir = request.args.get("workDir")
        in_path = base_work_dir + \'/\' + folder
        thread = threading.Thread(target=exe, args=(in_path, file_name, ))
        thread.start()
        return \'ok\'
    
    
    def exe(in_path, file_name):
        os.system("docker run -v " + in_path + ":/engine_data -i engine:v1 /engine_data/" + file_name)
        os.system("touch " + in_path + "/finish")
    
    
    if __name__ == \'__main__\':
        app.run(host="0.0.0.0", port=8080, debug=False)
    
    
    • os.system() 方法会阻塞主线程,导致当前请求时间过长。最终可能请求超时;所以这里使用线程异步处理这段逻辑;

    • 异步之后,调用方就无法知道程序什么时候跑完;因为需要进文件夹再拿取输出的文件数据;所以这边写入了finish文件来标识输出文件完毕,可以获取;

    • 这里遇见一个问题,脚本运行的时候

      nohup python test.py &
      

      通过postman调用的时候,发现报错如下:

      the input device is not a TTY
      

      这个错误是因为我们启动docker命令的时候添加了-t配置,去掉即可

  4. 服务内部封装本地化调用代码;

    @Component
    @Slf4j
    public class AssociateEngine {
        @Value("${engine.associate.mount}")
        private String path;
        @Value("${engine.associate.host-mount}")
        private String host_mount;
        @Value("${engine.associate.url}")
        private String engine_url;
        @Value("${engine.associate.polling-times}")
        private int polling_times;
        @Value("${engine.associate.polling-interval}")
        private int polling_interval; // 30s
        @Value("${engine.associate.file-in-name}")
        private String in_name;
        @Value("${engine.associate.file-out-suffix}")
        private String out_suffix;
    
        public String execute(String json) {
            try {
                // 首先创建一个工作目录文件夹
                String folder = String.valueOf(System.currentTimeMillis());
                String workDir = createWorkDir(this.splicingPath(true, path, folder));
                // 将输入参数json写入文件中
                writeStringToFile(json, this.splicingPath(false, workDir, in_name));
                log.info("本地化引擎,数据已写入容器{}, 宿主机:{}",  this.splicingPath(false, workDir, in_name), this.splicingPath(false, host_mount, folder, in_name));
                // 调用Python脚本中的api,触发docker容器运行
                invoke(engine_url + "?folder=%s&iname=%s&workDir=%s", folder, in_name, this.splicingPath(true, host_mount));
                // 这里是一个轮询,来获取执行结果,以查询到finish文件为结束;或者以轮询次数超过最大限制次数为结束
                String pollingResultPath = pollingResult(workDir, in_name);
                // 获取到本地化引擎输出的文件结果目录路径
                log.info("本地化引擎,获取引擎执行结果{}", pollingResultPath);
                if (pollingResultPath == null) {
                    log.error("引擎调用失败!");
                    return null;
                }
                // 读取输出的文件结果并返回
                return readEngineResult(pollingResultPath);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("引擎报错~ ,errorInfo: {}", e.getMessage(), e);
            }
            return null;
        }
    
        private String readEngineResult(String pollingResultPath) throws IOException {
            FileInputStream fin = new FileInputStream(pollingResultPath);
            InputStreamReader reader = new InputStreamReader(fin);
            BufferedReader buffReader = new BufferedReader(reader);
            StringBuilder stringBuilder = new StringBuilder();
            String line;
            while((line = buffReader.readLine())!=null){
                stringBuilder.append(line);
            }
            buffReader.close();
            return stringBuilder.toString();
        }
    
        private String pollingResult(String workDir, String inName) throws InterruptedException {
            int times = 1;
            // 轮询
            for(;;) {
                File file = new File(workDir);
                File[] fs = file.listFiles();
                if (fs == null) {
                    log.error("文件目录错误,目录为空!");
                    return null;
                }
                for (File f : fs) {
                    if (f.getName().equals("finish")) {
                        return this.splicingPath(false, workDir, inName + out_suffix);
                    }
                }
                log.info("本地化引擎,获取引擎执行结果第{}次失败。。。", times);
                if (times >= polling_times) {
                    log.error("轮询时间到,未查询到正确结果!!");
                    return null;
                }
                times++;
                Thread.sleep(polling_interval);
            }
        }
    
        private String createWorkDir(String path) {
            File file = new File(path);
            if (!file.exists()) {
                boolean mkdirs = file.mkdirs();
                if (mkdirs) {
                    return path;
                }
            }
            return path;
        }
    
        private void writeStringToFile(String json, String filePath) throws IOException {
            OutputStream out = new FileOutputStream(filePath);
            out.write(json.getBytes());
            out.flush();
            out.close();
        }
    
        private void invoke(String url, String folder, String inName, String hostMount) {
            HttpClientUtil.post(String.format(url, folder, inName, hostMount));
        }
    
        private String splicingPath(boolean includeFinish, String... paths) {
            StringBuilder stringBuilder = new StringBuilder();
            for (String path : paths) {
                stringBuilder.append(path);
                if (!path.endsWith(File.separator)) {
                    stringBuilder.append(File.separator);
                }
            }
            String finalPath = stringBuilder.toString();
            if (includeFinish) {
                return finalPath;
            }
            return finalPath.substring(0, finalPath.length()-1);
        }
    }
    
结论

至此解决引擎本地化的问题;

以上是关于基于anaconda3的引擎供容器调用本地化实践的主要内容,如果未能解决你的问题,请参考以下文章

一个可供参考的使用Mesos管理虚拟机的实践分享

Airflow实践 | 一款基于python的智能工作流引擎

强力的应用容器引擎——Docker的数据管理(dockerfile等)

Docker初识

中间件业务在网易轻舟容器平台的性能调优实践

Camunda工作流引擎简记