Datax-web 源码阅读记录

Posted 终回首

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Datax-web 源码阅读记录相关的知识,希望对你有一定的参考价值。

Datax-web 源码阅读;搭建Datax-web开发环境可以参考上一篇博客Datax-web 二次开发环境配置

一、Datax-web简介

DataX Web是在DataX之上开发的开源分布式数据同步工具,提供简单易用的 操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。

Datax Web基于xxl-job;Datax Web核心是拼接Datax的json、执行Datax脚本部分代码。

项目地址

Datax-web https://gitee.com/WeiYe-Jing/datax-web
Datax-web https://github.com/alibaba/DataX

软件版本:

Datax 3.0
Datax-web 2.1.2

commit信息

commit f0aac36b6f3c5c6182b8985bd0bcf1470201e92f (HEAD -> master, origin/master, origin/HEAD)
Author: WeiYe <33245094+WeiYe-Jing@users.noreply.github.com>
Date:   Tue Mar 23 21:42:58 2021 +0800

    Update README.md

框架版本:

Spring Boot 2.1.4.RELEASE
Mybatis Plus 3.3.1

二、源码阅读

按照调用顺序阅读

1 datax-admin(调度中心)启动 执行流程

1.1 调度中心启动类 DataXAdminApplication.main()

    public static void main(String[] args) throws UnknownHostException {
        Environment env = new SpringApplication(DataXAdminApplication.class).run(args).getEnvironment();
        String envPort = env.getProperty("server.port");
        String envContext = env.getProperty("server.contextPath");
        // 配置文件,默认端口是8080
        String port = envPort == null ? "8080" : envPort;
        String context = envContext == null ? "" : envContext;
        String path = port + "" + context + "/doc.html";
        String externalAPI = InetAddress.getLocalHost().getHostAddress();
        // 打印3个url,第一个和第二个我这里打开是一样的都是swagger接口文档,第三个是页面的访问地址
        logger.info(
                "Access URLs:\\n----------------------------------------------------------\\n\\t"
                        + "Local-API: \\t\\thttp://127.0.0.1:{}\\n\\t"
                        + "External-API: \\thttp://{}:{}\\n\\t"
                        + "web-URL: \\t\\thttp://127.0.0.1:{}/index.html\\n\\t----------------------------------------------------------",
                path, externalAPI, path, port);
    }

1.2 配置类 JobAdminConfig.afterPropertiesSet()

读取配置文件,初始化bean时会自动调用afterPropertiesSet()初始化JobScheduler类。InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会自动执行该方法。这个方法里初始化JobScheduler对象,同时调用init方法

	@Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new JobScheduler();
        // 初始化任务调度器
        xxlJobScheduler.init();
    }

1.3 调度器初始化 JobScheduler.init()

public void init() throws Exception {
        // 1 初始化页面国际化的工具
        initI18n();

        // 2 启动注册监控线程
        JobRegistryMonitorHelper.getInstance().start();

        // 3 启动失败监控线程
        JobFailMonitorHelper.getInstance().start();

        // 4 初始化触发线程池,创建快慢线程池
        JobTriggerPoolHelper.toStart();

        // 5 启动日志线程
        JobLogReportHelper.getInstance().start();

        // 6 启动作业调度器,这个类是主要逻辑.启动一个死循环,不断的遍历任务触发执行。为避免cpu飙升,隔一会睡一会(这里是重点)
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init datax-web admin success.");
    }

1.5 调度器真正初始化 JobScheduleHelper.getInstance().start()

public void start() {

        // 启动一个调度线程
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                	// 睡大约4秒
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init datax-web admin scheduler success.");

                // 预读取数量,等于快线程池大小加慢线程池大小的和再乘以20,默认的话是(200+100)*20 = 6000
                int preReadCount = (JobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + JobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
				//死循环,进程退出时修改变量为true
                while (!scheduleThreadToStop) {

                    // 从数据库查询job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = JobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
						// 获取一个悲观锁
                        preparedStatement = conn.prepareStatement("select * from job_lock where lock_name = 'schedule_lock' for update");
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 查询要执行的任务,如何判断要执行呢,即下次执行时间小于等于当前时间+5s
                        List<JobInfo> scheduleList = JobAdminConfig.getAdminConfig().getJobInfoMapper().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList != null && scheduleList.size() > 0) {
                            // 2、push time-ring
                            for (JobInfo jobInfo : scheduleList) {

                                // 判断下次执行时间是否小于(nowTime-5s),如果为true说明任务还没到执行时间跳过此次执行,刷新下次执行时间
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> datax-web, schedule misfire, jobId = " + jobInfo.getId());

                                    // fresh next
                                    refreshNextValidTime(jobInfo, new Date());
								// 判断下次执行时间是否刚过去5s内,如果是,将任务添加到触发执行线程池,刷新下次执行时间;计算执行秒数;将秒和任务id添加到ringData;刷新下次执行时间
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 触发任务
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
                                    logger.debug(">>>>>>>>>>> datax-web, schedule push trigger : jobId = " + jobInfo.getId());

                                    // 刷新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、update trigger info
                            for (JobInfo jobInfo : scheduleList) {
                                JobAdminConfig.getAdminConfig().getJobInfoMapper().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> datax-web, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // 手动提交修改,关闭连接
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis() - start;


                    // Wait seconds, align second
                    if (cost < 1000) {  // 扫描时间超过1秒就不sleep,小于1秒才sleep
                        try {
                            // 如果前面预读成功,睡1秒,预读失败睡大约4秒
                            TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> datax-web, JobScheduleHelper#scheduleThread stop");
            }
        });
        // 设置守护进程
        scheduleThread.setDaemon(true);
        scheduleThread.setName("datax-web, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // 启动处理之前加入到ringData中数据的线程(前面只是保存到ringData,这里是真正开始触发执行)
        ringThread = new Thread(() -> {

            // align second
            try {
            	// 睡(0~1)秒
                TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
            } catch (InterruptedException e) {
                if (!ringThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
			// 死循环,只有程序退出时才修改标志为true
            while (!ringThreadToStop) {

                try {
                    // second data
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    // 这里就是处理之前添加到ringData里的任务;前面存ringData时,key是秒(从1到60),value是由jobid组成的list;每次从ringData里取2秒的数据。
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }

                    // ring trigger
                    logger.debug(">>>>>>>>>>> datax-web, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
                    if (ringItemData.size() > 0) {
                        // do trigger
                        for (int jobId : ringItemData) {
                            // 依次调用JobTriggerPoolHelper.trigger()触发任务
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
                        }
                        // 执行完后清空列表
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                	// 打印非停止状态下的异常
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> datax-web, JobScheduleHelper#ringThread error:{}", e);
                    }
                }

                // 睡(0-1)秒
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> datax-web, JobScheduleHelper#ringThread stop");
        });
        ringThread.setDaemon(true);
        ringThread.setName("datax-web, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

到这里调度中心就启动成功了,下面再看执行器的执行流程

2 datax-executor(执行器)启动 执行流程

Datax-web入门配置与启动

Datax-web 二次开发环境配置

超实用的php代码片段

Python代码阅读(第19篇):合并多个字典

muduo2.0源码阅读记录

Python代码阅读(第26篇):将列表映射成字典