流程控制:分布式并行任务流程控制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流程控制:分布式并行任务流程控制相关的知识,希望对你有一定的参考价值。

  • 背景:

目前工作中遇到一个比较急,又有点费事的工作任务:

1)目前系统中已经已经包含了一些比较完善的部分模块,但是模块之间没有一个控制流程来管理,就造成程序没有办法自动化;

2)已经完成模块中有几个是采用分布式部署,但各个服务器之间又是采用的并行执行不同的任务(目的最大化利用服务器,节省处理总耗时):这写对流程化控制带来了一些控制繁琐问题。

3)目前并不需要考虑太多稳定行的问题,但是流程控制程序必须考虑到高可用性(就是需要部署为HA)。

  • 目前已经拥有的功能模块:

1)采集及更新工参、KPI、路测、扫频等到数据库:这些数据在ftp上,因此,每次更新需要从ftp上自动采集数据(当然是任务触发时最新的数据)之后更新到数据库中;

2)采集mr数据及解析mr,这里分为了三个功能模块:

2.1)采集mr功能:目前已经包含,但是功能还是不太完善,每次采集的触发时间应该是当任务触发时开启采集(这样避免了未领取到任务时盲目的采集,造成服务器性能下降其他分析慢,IO存储不足问题),采集距离“采集任务”触发时间最近且完整的数据;

2.2)mr大压缩包解压任务:目前系统中包含的mr数据文件格式比较复杂,可能包含大压缩包套文件,或者达压缩吧套小压缩包的问题,最终需要加压为最小压缩包(也就是在解压一次就是.xml文件);

2.3)mr解析:mr解析是一个分布式部署(部署在多台服务器,按照公式:enb%服务器数量,得到的值来分配enb到具体哪台服务器),但是每台服务器与每台服务器之间是没有联系的:每台服务器只负责并行的处理分配到自己节点上的任务。

3)mr栅格化:由于上边mr解析后分别存储到自己节点服务器上的数据,因此这里的mr栅格化数据也是分布式的部署在每台节点服务器上。

  • 解决方案思路:

为此,我写了一个代码逻辑框架:

技术分享

文件Program.cs是代码核心业务控制:

1)它支持部署HA:可以部署多台服务器,和一台服务器一样工作。

2)核心业务控制思路:

 1         static void Main(string[] args)
 2         {
 3             while (true)
 4             {
 5                 // TaskLock在zookeeper或sqlserver数据中只存在唯一的一条记录。
 6                 TaskLock taskLock = TaskLockBO.Get();
 7 
 8                 if (taskLock.Lock == LockStatus.Locked)
 9                 {
10                     // 获取正在执行的任务。。。。
11                     List<Task> taskItems = TaskBO.GetToDoTaskByTaskGroup(taskLock.DoingTaskGroup);
12 
13                     if (taskItems == null || taskItems.Count == 0)
14                     {
15                         // 修改taskLock.Lock=UnLock、taskLock.DoingTaskGroup=Guid.Empty
16                     }
17                     else
18                     {
19                         // 按照任务的优先级执行task
20                         // 开始调度每台计算服务器上的任务。。。
21                         // 1)如果第一个待执行(todo)的任务是“工参导入或更新”,修改其任务状态为doing。
22                         //       “工参导入或更新”服务只可能部署在一台服务器上或者就是这里实现,当获取到归属自己的任务状态是doing时,就开始从ftp上采集工参数据,并解析导入,完成修改任务状态为done,失败修改任务状态为fail
23                         // 2)如果第一个待执行(todo)的任务是“采集MR”,修改其任务状态为doing。
24                         //       “采集MR”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始监控ftp,并采集ftp数据到本地,完成修改任务状态为done,失败修改任务状态为fail
25                         // 3)如果第一个待执行(todo)的任务是“解压超大压缩包”,修改其任务状态为doing.
26                         //       “解压超大压缩包”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始循环遍历采集的mr,若找到超大压缩包就进行超大压缩包解压,完成修改任务状态为done,失败修改任务状态为fail
27                         // 4)如果第一个待执行(todo)的任务是“解析MR”,修改任务状态为predoing,循环遍历ftp目录的数据按照分发规则把mr问价分发到不同的计算节点服务器指定的位置,并创建“解析mr子任务”给每台解析mr服务器,并修改该任务状态为doing
28                         //       注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“解析MR”任务状态为doing(每个compute包含一个“mr解析”任务).
29                         //       “解析MR”服务部署在多个解析处理服务器上,当获取到归属自己的节点的“解析mr”任务状态是doing时,就开始获取自己的节点下的“解析mr子任务”逐个处理,处理完成后修改归属自己的节点“mr解析”任务状态为done,失败修改任务状态为fail
30                         //       注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr解析”都完成后,才可以进行下一步。
31                         // 5)如果第一个待执行(todo)的任务是“mr栅格化”,修改任务状态为doing.
32                         //       注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“MR栅格化”任务状态为doing(每个compute包含一个“MR栅格化”任务).
33                         //       “MR栅格化”服务部署在多个处理服务器上,当获取到归属自己的节点的“mr栅格化”任务状态是doing时,开始逐个处理自己节点上的“mr栅格化”任务,处理完成后修改归属自己的节点“mr栅格化”任务状态为done,失败修改任务状态为fail
34                         //       注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr栅格化”都完成后,才可以进行下一步。
35                     }
36                 }
37                 else if (taskLock.Lock == LockStatus.UnLock)
38                 {
39                     // 尝试获取新的任务。
40                     // 修改taskLock.lock=PreLock
41 
42 
43                     // 添加任务成功,则修改taskLock.Lock=Locked、taskLock.DoingTaskGroup赋值;添加任务失败,则修改taskLock.Lock=UnLock
44                 }
45 
46                 // 5 分钟轮询一次。
47                 Thread.Sleep(5 * 60 * 1000);
48             }
49         }

3)任务状态&类型&定义包含:

    enum TaskType
    {
        /// <summary>
        /// 导入或更新工参、KPI等数据
        /// </summary>
        ImportSiteCellKpi = 0,
        /// <summary>
        /// 采集MR数据
        /// </summary>
        GatherMR = 1,
        /// <summary>
        /// 尝试解压包含多层压缩包的MR数据
        /// </summary>
        DoUnZipMR = 2,        
        /// <summary>
        /// 解析入库MR数据
        /// </summary>
        DoParserMR = 3,
        /// <summary>
        /// MR栅格化
        /// </summary>
        DoMRRaster = 4,
    }
    enum TaskStatus
    {
        Todo = 0,        PreDoing = 1,        Doing = 2,        Done = 3,        Fail = 4
    }
    class Task
    {
        /// <summary>
        /// 如果为同一批次批量处理流程,则TaskGroup为同一个Guid值。
        /// </summary>
        public Guid TaskGroup { get; set; }
        public int TaskId { get; set; }
        public TaskType TaskType { get; set; }
        public TaskStatus TaskStatus { get; set; }
        /// <summary>
        /// 任务优先级
        /// </summary>
        public int Priority { get; set; }
        public string ComputeIP { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime DoingTime { get; set; }
        public DateTime DoneTime { get; set; }
        public DateTime FailTime { get; set; }
    }

4)任务流程控制锁(全局锁):

    enum LockStatus
    {
        PreLock = 0,
        Locked = 1,
        UnLock = 2
    }
    /// <summary>
    /// 所有任务流程是否被Lock掉
    /// 1)当一批次任务未完成之前就不允许有任何新的一批任务开始执行,
    ///     必须等到一批次任务流程执行完成后才可以执行,否则将会导致数据执行的速度过慢,或者导致数据混乱情况。
    /// 2)整个系统中,要确保正在执行的任务流程只有唯一一个,否则系统将会造成性能底下,或者出现数据错乱情况。
    /// </summary>
    class TaskLock
    {
        public LockStatus Lock { get; set; }
        /// <summary>
        /// 当开始执行新的一批次任务流程是,把该批次任务流程组编号写入此处,同时修改TaskLock.isLock为true.
        /// </summary>
        public Guid DoingTaskGroup { get; set; }
    }

 

代码下载:

链接:http://pan.baidu.com/s/1pKIYwl1 密码:mbl7

以上是关于流程控制:分布式并行任务流程控制的主要内容,如果未能解决你的问题,请参考以下文章

控制异步回调利器 - async 串行series,并行parallel,智能控制auto简介

程序流程控制----方法

Node.js-串行化流程控制

05 流程控制

珠峰培训node正式课笔记 -- async任务流程控制,异步流程控制

PHP中的流程控制