流程控制:分布式并行任务流程控制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流程控制:分布式并行任务流程控制相关的知识,希望对你有一定的参考价值。
- 背景:
目前工作中遇到一个比较急,又有点费事的工作任务:
1)目前系统中已经已经包含了一些比较完善的部分模块,但是模块之间没有一个控制流程来管理,就造成程序没有办法自动化;
2)已经完成模块中有几个是采用分布式部署,但各个服务器之间又是采用的并行执行不同的任务(目的最大化利用服务器,节省处理总耗时):这写对流程化控制带来了一些控制繁琐问题。
3)目前并不需要考虑太多稳定行的问题,但是流程控制程序必须考虑到高可用性(就是需要部署为HA)。
- 目前已经拥有的功能模块:
1)采集及更新工参、KPI、路测、扫频等到数据库:这些数据在ftp上,因此,每次更新需要从ftp上自动采集数据(当然是任务触发时最新的数据)之后更新到数据库中;
2)采集mr数据及解析mr,这里分为了三个功能模块:
2.1)采集mr功能:目前已经包含,但是功能还是不太完善,每次采集的触发时间应该是当任务触发时开启采集(这样避免了未领取到任务时盲目的采集,造成服务器性能下降其他分析慢,IO存储不足问题),采集距离“采集任务”触发时间最近且完整的数据;
2.2)mr大压缩包解压任务:目前系统中包含的mr数据文件格式比较复杂,可能包含大压缩包套文件,或者达压缩吧套小压缩包的问题,最终需要加压为最小压缩包(也就是在解压一次就是.xml文件);
2.3)mr解析:mr解析是一个分布式部署(部署在多台服务器,按照公式:enb%服务器数量,得到的值来分配enb到具体哪台服务器),但是每台服务器与每台服务器之间是没有联系的:每台服务器只负责并行的处理分配到自己节点上的任务。
3)mr栅格化:由于上边mr解析后分别存储到自己节点服务器上的数据,因此这里的mr栅格化数据也是分布式的部署在每台节点服务器上。
- 解决方案思路:
为此,我写了一个代码逻辑框架:
文件Program.cs是代码核心业务控制:
1)它支持部署HA:可以部署多台服务器,和一台服务器一样工作。
2)核心业务控制思路:
1 static void Main(string[] args) 2 { 3 while (true) 4 { 5 // TaskLock在zookeeper或sqlserver数据中只存在唯一的一条记录。 6 TaskLock taskLock = TaskLockBO.Get(); 7 8 if (taskLock.Lock == LockStatus.Locked) 9 { 10 // 获取正在执行的任务。。。。 11 List<Task> taskItems = TaskBO.GetToDoTaskByTaskGroup(taskLock.DoingTaskGroup); 12 13 if (taskItems == null || taskItems.Count == 0) 14 { 15 // 修改taskLock.Lock=UnLock、taskLock.DoingTaskGroup=Guid.Empty 16 } 17 else 18 { 19 // 按照任务的优先级执行task 20 // 开始调度每台计算服务器上的任务。。。 21 // 1)如果第一个待执行(todo)的任务是“工参导入或更新”,修改其任务状态为doing。 22 // “工参导入或更新”服务只可能部署在一台服务器上或者就是这里实现,当获取到归属自己的任务状态是doing时,就开始从ftp上采集工参数据,并解析导入,完成修改任务状态为done,失败修改任务状态为fail 23 // 2)如果第一个待执行(todo)的任务是“采集MR”,修改其任务状态为doing。 24 // “采集MR”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始监控ftp,并采集ftp数据到本地,完成修改任务状态为done,失败修改任务状态为fail 25 // 3)如果第一个待执行(todo)的任务是“解压超大压缩包”,修改其任务状态为doing. 26 // “解压超大压缩包”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始循环遍历采集的mr,若找到超大压缩包就进行超大压缩包解压,完成修改任务状态为done,失败修改任务状态为fail 27 // 4)如果第一个待执行(todo)的任务是“解析MR”,修改任务状态为predoing,循环遍历ftp目录的数据按照分发规则把mr问价分发到不同的计算节点服务器指定的位置,并创建“解析mr子任务”给每台解析mr服务器,并修改该任务状态为doing 28 // 注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“解析MR”任务状态为doing(每个compute包含一个“mr解析”任务). 29 // “解析MR”服务部署在多个解析处理服务器上,当获取到归属自己的节点的“解析mr”任务状态是doing时,就开始获取自己的节点下的“解析mr子任务”逐个处理,处理完成后修改归属自己的节点“mr解析”任务状态为done,失败修改任务状态为fail 30 // 注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr解析”都完成后,才可以进行下一步。 31 // 5)如果第一个待执行(todo)的任务是“mr栅格化”,修改任务状态为doing. 32 // 注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“MR栅格化”任务状态为doing(每个compute包含一个“MR栅格化”任务). 33 // “MR栅格化”服务部署在多个处理服务器上,当获取到归属自己的节点的“mr栅格化”任务状态是doing时,开始逐个处理自己节点上的“mr栅格化”任务,处理完成后修改归属自己的节点“mr栅格化”任务状态为done,失败修改任务状态为fail 34 // 注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr栅格化”都完成后,才可以进行下一步。 35 } 36 } 37 else if (taskLock.Lock == LockStatus.UnLock) 38 { 39 // 尝试获取新的任务。 40 // 修改taskLock.lock=PreLock 41 42 43 // 添加任务成功,则修改taskLock.Lock=Locked、taskLock.DoingTaskGroup赋值;添加任务失败,则修改taskLock.Lock=UnLock 44 } 45 46 // 5 分钟轮询一次。 47 Thread.Sleep(5 * 60 * 1000); 48 } 49 }
3)任务状态&类型&定义包含:
enum TaskType { /// <summary> /// 导入或更新工参、KPI等数据 /// </summary> ImportSiteCellKpi = 0, /// <summary> /// 采集MR数据 /// </summary> GatherMR = 1, /// <summary> /// 尝试解压包含多层压缩包的MR数据 /// </summary> DoUnZipMR = 2, /// <summary> /// 解析入库MR数据 /// </summary> DoParserMR = 3, /// <summary> /// MR栅格化 /// </summary> DoMRRaster = 4, } enum TaskStatus { Todo = 0, PreDoing = 1, Doing = 2, Done = 3, Fail = 4 } class Task { /// <summary> /// 如果为同一批次批量处理流程,则TaskGroup为同一个Guid值。 /// </summary> public Guid TaskGroup { get; set; } public int TaskId { get; set; } public TaskType TaskType { get; set; } public TaskStatus TaskStatus { get; set; } /// <summary> /// 任务优先级 /// </summary> public int Priority { get; set; } public string ComputeIP { get; set; } public DateTime CreateTime { get; set; } public DateTime DoingTime { get; set; } public DateTime DoneTime { get; set; } public DateTime FailTime { get; set; } }
4)任务流程控制锁(全局锁):
enum LockStatus { PreLock = 0, Locked = 1, UnLock = 2 }
/// <summary> /// 所有任务流程是否被Lock掉 /// 1)当一批次任务未完成之前就不允许有任何新的一批任务开始执行, /// 必须等到一批次任务流程执行完成后才可以执行,否则将会导致数据执行的速度过慢,或者导致数据混乱情况。 /// 2)整个系统中,要确保正在执行的任务流程只有唯一一个,否则系统将会造成性能底下,或者出现数据错乱情况。 /// </summary> class TaskLock { public LockStatus Lock { get; set; } /// <summary> /// 当开始执行新的一批次任务流程是,把该批次任务流程组编号写入此处,同时修改TaskLock.isLock为true. /// </summary> public Guid DoingTaskGroup { get; set; } }
代码下载:
链接:http://pan.baidu.com/s/1pKIYwl1 密码:mbl7
以上是关于流程控制:分布式并行任务流程控制的主要内容,如果未能解决你的问题,请参考以下文章
控制异步回调利器 - async 串行series,并行parallel,智能控制auto简介