DolphinScheduler源码分析
Posted 海豚调度
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DolphinScheduler源码分析相关的知识,希望对你有一定的参考价值。
本文是基于1.2.0版本进行分析,与最新版本的实现有一些出入,还请读者辩证的看待本源码分析。具体细节可能描述的不是很准确,仅供参考。
SELECT command.*
FROM t_ds_command command
JOIN t_ds_process_definition definition
ON command.process_definition_id = definition.id
WHERE definition.release_state = 1
AND definition.flag = 1
ORDER BY command.update_time ASC
LIMIT 1
一个任务执行,会分别占用master和worker各一个线程,这一点不太好。
这样来看,一个任务执行,会占用master2个线程。
-
dolphinscheduler-ui 前端页面模块 -
dolphinscheduler-server 核心模块。包括master/worker等功能 -
dolphinscheduler-common 公共模块。公共方法或类 -
dolphinscheduler-api Restful接口。前后端交互层,与master/worker交互等功能 -
dolphinscheduler-dao 数据操作层。实体定义、数据存储 -
dolphinscheduler-alert 预警模块。与预警相关的方法、功能 -
dolphinscheduler-rpc 日志查看。提供日志实时查看rpc功能 -
dolphinscheduler-dist 与编译、分发相关的模块。没有具体逻辑功能
-
UI功能不分析 -
从与UI交互的API模块开始着手看 -
重点分析核心功能 -
非核心功能仅做了解
@SpringBootApplication
@ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true")
@ServletComponentScan
@ComponentScan("org.apache.dolphinscheduler")
@Import({MasterServer.class, WorkerServer.class})
@EnableSwagger2
public class CombinedApplicationServer extends SpringBootServletInitializer {
public static void main(String[] args) throws Exception {
ApiApplicationServer.main(args);
LoggerServer server = new LoggerServer();
server.start();
AlertServer alertServer = AlertServer.getInstance();
alertServer.start();
}
}
也不知道是否内嵌的意义在哪里,直接内嵌不好么?
public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "name", required = true) String name,
@RequestParam(value = "processDefinitionJson", required = true) String json,
@RequestParam(value = "locations", required = true) String locations,
@RequestParam(value = "connects", required = true) String connects,
@RequestParam(value = "description", required = false) String description) {
try {
logger.info("login user {}, create process definition, project name: {}, process definition name: {}, " +
"process_definition_json: {}, desc: {} locations:{}, connects:{}",
loginUser.getUserName(), projectName, name, json, description, locations, connects);
Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json,
description, locations, connects);
return returnDataList(result);
} catch (Exception e) {
logger.error(Status.CREATE_PROCESS_DEFINITION.getMsg(), e);
return error(Status.CREATE_PROCESS_DEFINITION.getCode(), Status.CREATE_PROCESS_DEFINITION.getMsg());
}
}
但这段代码有一个controller与service分隔不清的地方:HTTP返回的结果由谁处理。此处返回结果是由service负责的,service会创建一个Map<String, Object>类型的result字段,然后调用result.put("processDefinitionId",processDefine.getId());设置最终返回的数据。其实个人是不敢苟同这种做法的,严格来说,service只返回与业务相关的实体,HTTP具体返回什么信息应该交由controller处理。
public Map<String, Object> createProcessDefinition(User loginUser, String projectName, String name,
String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
// check project auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return checkResult;
}
ProcessDefinition processDefine = new ProcessDefinition();
Date now = new Date();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkProcessJson;
}
processDefine.setName(name);
processDefine.setReleaseState(ReleaseState.OFFLINE);
processDefine.setProjectId(project.getId());
processDefine.setUserId(loginUser.getId());
processDefine.setProcessDefinitionJson(processDefinitionJson);
processDefine.setDescription(desc);
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
if (globalParamsList != null && globalParamsList.size() > 0) {
Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
globalParamsList = new ArrayList<>(globalParamsSet);
processDefine.setGlobalParamList(globalParamsList);
}
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(Flag.YES);
processDefineMapper.insert(processDefine);
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId",processDefine.getId());
return result;
}
-
校验当前用户是否拥有所属项目的权限 -
校验流程定义JSON是否合法。例如是否有环 -
构造ProcessDefinition对象插入数据库 -
设置HTTP返回结果
ProcessDefinitionService的功能非常不合理,居然还有鉴权的功能,按照我的理解,有一个校验、插入数据库的功就可以了,其他的功能都可以抛出去。
@Data
@TableName("t_ds_process_definition")
public class ProcessDefinition
-
校验当前用户是否拥有所属项目的权限 -
校验流程定义JSON是否合法。例如是否有环 -
构造Schedule对象插入数据库 -
设置HTTP返回结果
-
调用OSUtils.checkResource,检查当前资源(内存、CPU)。 -
资源超出阈值,则休眠1秒进入下一次循环。 -
检查zookeeper是否连接成功 -
获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁 -
查询一个Command,不为null时进行后续逻辑。 -
休眠1秒,进入下一次循环 -
进入下一次循环之前,释放InterProcessMutex锁
/**
* if the process closes, a signal is placed as true, and all threads get this flag to stop working
*/
public class Stopper {
private static volatile AtomicBoolean signal = new AtomicBoolean(false);
public static final boolean isStoped(){
return signal.get();
}
public static final boolean isRunning(){
return !signal.get();
}
public static final void stop(){
signal.getAndSet(true);
}
}
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance,processDao));
}
}catch (Exception e){
logger.error("scan command error ", e);
processDao.moveToErrorCommand(command, e.toString());
}
}
-
调用OSUtils.checkResource,检查当前资源(内存、CPU)。 -
资源超出阈值,则休眠1秒进入下一次循环。 -
检查zookeeper是否连接成功 -
获取一个InterProcessMutex锁(分布式的公平可重入互斥锁)。也就是只有一个master可以获取到这个锁 -
查询一个Command,如果当前线程数够用,则创建一个流程实例(ProcessInstance),交给MasterExecThread线程处理。 -
休眠1秒,进入下一次循环 -
进入下一次循环之前,释放InterProcessMutex锁
但个人感觉其定义不是非常清晰。如果是mapper的一个全集,则其他任何地方都不应该再调用mapper,事实又不是这样;如果只是流程定义相关的操作, 其功能又过于大。
public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){
this.processDao = processDao;
this.processInstance = processInstance;
int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS,
Constants.defaultMasterTaskExecNum);
this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
masterTaskExecNum);
}
static {
try {
conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1);
}
}
配置这种字段,完全可以全局唯一,到处传参,没必要在new一个。一般情况下这个类的内容也不会修改。
@Override
public void run() {
// process instance is null
if (processInstance == null){
logger.info("process instance is not exists");
return;
}
// check to see if it's done
if (processInstance.getState().typeIsFinished()){
logger.info("process instance is done : {}",processInstance.getId());
return;
}
try {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){
// sub process complement data
executeComplementProcess();
}else{
// execute flow
executeProcess();
}
}catch (Exception e){
logger.error("master exec thread exception: " + e.getMessage(), e);
logger.error("process execute failed, process id:{}", processInstance.getId());
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setEndTime(new Date());
processDao.updateProcessInstance(processInstance);
}finally {
taskExecService.shutdown();
// post handle
postHandle();
}
}
-
判断processInstance是否为null。为null则退出 -
判断processInstance是否已经完成(成功、报错、取消、暂停、等待) -
判断是否为补数。是则走补数的逻辑 -
执行当前流程定义实例(executeProcess) -
调用taskExecService.shutdown(),等待所有线程正常退出
感觉第一步有点多此一举。
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.isTaskComplete()){
completeTaskList.put(task.getName(), task);
}
if(task.getState().typeIsFailure() && !task.taskCanRetry()){
errorTaskList.put(task.getName(), task);
}
}
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
if(processDag == null){
logger.error("processDag is null");
return;
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
}
/**
* the object of DAG
*/
private DAG<String,TaskNode,TaskNodeRelation> dag;
/**
* analysis of DAG
* Node: node
* NodeInfo:node description information
* EdgeInfo: edge description information
*/
public class DAG<Node, NodeInfo, EdgeInfo>
public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/**
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node);
}
}
/**
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode());
}
}
return dag;
}
-
submitPostNode(null) -
起一个while循环,直至流程定义实例停止(成功、失败、取消、暂停、等待) -
首先判断是否超时,超时则发送预警邮件 -
获取当前活动的任务节点的Map。key是MasterBaseTaskExecThread对象,value是Future<Boolean>。value其实是MasterBaseTaskExecThread线程的当前状态。 -
如果当前任务实例已经结束,则从Map中移除 -
如果当前任务实例成功,则put到completeTaskList且调用submitPostNode(task.getName()) -
如果当前任务实例失败,则重试;否则直接结束(比如手动停止或暂停) -
更新当前流程定义实例的状态,进入下一个循环
private void submitPostNode(String parentNodeName){
List<TaskInstance> submitTaskList = null;
if(parentNodeName == null){
submitTaskList = getStartSubmitTaskList();
}else{
submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);
}
// if previous node success , post node submit
for(TaskInstance task : submitTaskList){
if(readyToSubmitTaskList.containsKey(task.getName())){
continue;
}
if(completeTaskList.containsKey(task.getName())){
logger.info("task {} has already run success", task.getName());
continue;
}
if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());
}else{
addTaskToStandByList(task);
}
}
}
/**
* submit task to execute
* @param taskInstance task instance
* @return TaskInstance
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
if(taskInstance.isSubProcess()){
abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
}else {
abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
}
Future<Boolean> future = taskExecService.submit(abstractExecThread);
activeTaskNode.putIfAbsent(abstractExecThread, future);
return abstractExecThread.getTaskInstance();
}
public class MasterTaskExecThread extends MasterBaseTaskExecThread
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
this.processDao = BeanContext.getBean(ProcessDao.class);
this.alertDao = BeanContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
}
/**
* submit task instance and wait complete
* @return true is task quit is true
*/
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
this.taskInstance = submit();
if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processDao.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
}
我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。
protected TaskInstance submit(){
Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES,
Constants.defaultMasterCommitRetryTimes);
Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL,
Constants.defaultMasterCommitRetryInterval);
int retryTimes = 1;
while (retryTimes <= commitRetryTimes){
try {
TaskInstance task = processDao.submitTask(taskInstance, processInstance);
if(task != null){
return task;
}
logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
}
retryTimes += 1;
}
return null;
}
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(), processInstance.getState() );
processInstance = this.findProcessInstanceDetailById(processInstance.getId());
//submit to mysql
TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
if(task.isSubProcess() && !task.getState().typeIsFinished()){
ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
}else if(!task.getState().typeIsFinished()){
//submit to task queue
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
submitTaskToQueue(task);
}
logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
return task;
}
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
if(checkTaskExistsInTaskQueue(taskInstance)){
logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
return true;
}
logger.info("task ready to queue: {}" , taskInstance);
taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
return true;
}catch (Exception e){
logger.error("submit task to queue Exception: ", e);
logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
return TaskQueueZkImpl.getInstance();
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
System.exit(-1);
}
return null;
}
既然只支持zookeeper,这段冗余代码应该删除的。
/**
* add task to tasks queue
*
* @param key task queue name
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public void add(String key, String value) {
try {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
}
-
通过taskInstance.id查询taskInstance。其实就是查询taskInstance的最新状态。 -
通过参数判断是否启用超时检查 -
一个while“死循环”。 -
while中判断任务是否执行结束,是则退出 -
获取任务实例、流程实例最新状态 -
休眠1秒,进入下一次while循环
Stopper.isRunning()作为一个全局变量,控制了N多的线程,每个线程都处于一个while“死循环”中。虽然都sleep一段时间,但感觉还是有点浪费。
private Runnable heartBeatThread(){
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
return;
}
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
}
}
};
return heartBeatThread;
}
public void heartBeatForZk(String znode, String serverType){
try {
//check dead or not in zookeeper
if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){
stoppable.stop("i was judged to death, release resources and stop myself");
return;
}
byte[] bytes = zkClient.getData().forPath(znode);
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return;
}
String str = splits[0] + Constants.COMMA
+ splits[1] + Constants.COMMA
+ OSUtils.cpuUsage() + Constants.COMMA
+ OSUtils.memoryUsage() + Constants.COMMA
+ OSUtils.loadAverage() + Constants.COMMA
+ splits[5] + Constants.COMMA
+ DateUtils.dateToString(new Date());
zkClient.setData().forPath(znode,str.getBytes());
} catch (Exception e) {
logger.error("heartbeat for zk failed : " + e.getMessage(), e);
stoppable.stop("heartbeat for zk exception, release resources and stop myself");
}
}
我们注意到zkMasterClient的类型是ZKMasterClient,那是不是还会有一个功能类似的ZKWorkerClient?也是用来汇报worker节点的系统资源信息的?
public synchronized void stop(String cause) {
try {
//execute only once
if(Stopper.isStoped()){
return;
}
logger.info("master server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
logger.warn("thread sleep exception:" + e.getMessage(), e);
}
try {
heartbeatMasterService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
//close quartz
try{
QuartzExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("Quartz service stopped exception:{}",e.getMessage());
}
logger.info("Quartz service stopped");
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
}
logger.info("threadpool service stopped");
try {
masterSchedulerService.shutdownNow();
}catch (Exception e){
logger.warn("master scheduler service stopped exception:{}",e.getMessage());
}
logger.info("master scheduler service stopped");
try {
zkMasterClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
logger.info("zookeeper service stopped");
} catch (Exception e) {
logger.error("master server stop exception : " + e.getMessage(), e);
System.exit(-1);
}
}
-
Stopper.stop()。关闭全部线程的循环标志 -
休眠3秒 -
heartbeatMasterService.shutdownNow -
QuartzExecutors.getInstance().shutdown -
ThreadPoolExecutors.getInstance().shutdown -
masterSchedulerService.shutdownNow -
zkMasterClient.close
// start QuartzExecutors
// what system should do if exception
try {
ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed : " + e.getMessage(), e);
}
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (zkMasterClient.getActiveMasterNum() <= 1) {
for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
}
stop("shutdownhook");
}
}));
public static void init(ProcessDao processDao) {
ProcessScheduleJob.processDao = processDao;
}
@Override
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if(!runCheckFlag) {
continue;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
if (CollectionUtils.isEmpty(tasksQueueList)){
continue;
}
// creating distributed locks, lock path /dolphinscheduler/lock/worker
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
zkWorkerClient.getWorkerLockPath());
// task instance id str
List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isEmpty(taskQueueStr)) {
continue;
}
if (!checkThreadCount(poolExecutor)) {
break;
}
// get task instance id
taskInstId = getTaskInstanceId(taskQueueStr);
// mainly to wait for the master insert task to succeed
waitForTaskInstance();
taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
taskInstance.getProcessDefine().getUserId());
// verify tenant is null
if (verifyTenantIsNull(tenant)) {
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
logger.info("worker fetch taskId : {} from queue ", taskInstId);
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
// local execute path
String execLocalPath = getExecLocalPath();
logger.info("task instance local execute path : {} ", execLocalPath);
// init task
taskInstance.init(OSUtils.getHost(),
new Date(),
execLocalPath);
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
tenant.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);
}
}catch (Exception e){
logger.error("fetch task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
}
}
}
真恶心的写法,居然是先获取runCheckFlag标志,休眠后再判断这个值。
难道不应该用hasTask这样的接口判断吗?此处只是判断是否有作业,获取全部的任务列表就不合适了,优点浪费内存。
其实可以基于zookeeper实现一个具有优先级的分布式锁,申请锁时会设置当前客户端的权重,权重大的抢到锁的可能性随之增大。
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2...
-
判断taskQueueStr是否为空。感觉有点多此一举。 -
判断当前线程数是否够用 -
从taskQueueStr中取到任务ID。就是按照_分隔之后的第四个字段。 -
等待任务实例信息插入到数据库。循环30次,每次等待1秒。注释说数据库操作会被延迟,不知道哪里会延迟。 -
通过任务id,获取任务实例信息。 -
通过任务实例,获取租户信息。 -
通过任务实例,获取用户队列信息。为啥不在查询任务实例信息的时候,直接获取到呢?或者在getTaskInstanceDetailByTaskId一次性获取到? -
判断任务实例是否可以在当前节点执行,不能则继续下一个任务处理。这为啥不提前判断呢?调了2次db查询才来判断? -
任务实例初始化 -
检查目录、用户是否存在。不存在则创建用户、目录。为啥不是提前建好?每次还要检查一遍。 -
提交任务,交给TaskScheduleThread线程执行。 -
删除taskQueue中对应的任务节点。
@Override
public void run() {
try {
// update task state is running according to task type
updateTaskState(taskInstance.getTaskType());
logger.info("script path : {}", taskInstance.getExecutePath());
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// copy hdfs/minio file to local
copyHdfsToLocal(processDao,
taskInstance.getExecutePath(),
createProjectResFiles(taskNode),
logger);
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInstance.getExecutePath(),
processInstance.getScheduleTime(),
taskInstance.getName(),
taskInstance.getTaskType(),
taskInstance.getId(),
CommonUtils.getSystemEnvPath(),
processInstance.getTenantCode(),
processInstance.getQueue(),
taskInstance.getStartTime(),
getGlobalParamsMap(),
taskInstance.getDependency(),
processInstance.getCmdTypeIfComplement());
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps,
taskLogger);
// task init
task.init();
// task handle
task.handle();
// task result process
task.after();
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
// update task instance state
processDao.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());
}
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(),
task.getExitStatus());
// update task instance state
processDao.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
}
还是一步步分析run方法。
-
更新任务状态为ExecutionStatus.RUNNING_EXEUTION -
从任务实例获取任务节点信息。 -
从HDFS复制文件到本地。包括一些用户上传的资源文件,jar包、SQL文件、配置文件等等。 -
构造TaskProps对象。 -
初始化任务日志对象。 -
构造AbstractTask实例 -
依次调用AbstractTask的init、handle、after。 -
更新任务实例的状态。异常失败或成功等。
public static AbstractTask newTask(String taskType, TaskProps props, Logger logger)
throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskType)) {
case SHELL:
return new ShellTask(props, logger);
case PROCEDURE:
return new ProcedureTask(props, logger);
case SQL:
return new SqlTask(props, logger);
case MR:
return new MapReduceTask(props, logger);
case SPARK:
return new SparkTask(props, logger);
case FLINK:
return new FlinkTask(props, logger);
case PYTHON:
return new PythonTask(props, logger);
case DEPENDENT:
return new DependentTask(props, logger);
case HTTP:
return new HttpTask(props, logger);
default:
logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type");
}
public ShellTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.taskDir = taskProps.getTaskDir();
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),
taskProps.getTaskAppId(),
taskProps.getTaskInstId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
public void handle() throws Exception {
try {
// construct process
exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
logger.error("shell task failure", e);
exitStatusCode = -1;
}
}
private void buildProcess(String commandFile) throws IOException {
//init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskDir));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile);
process = processBuilder.start();
// print command
printCommand(processBuilder);
}
public void handle(){
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
try{
TaskInstance taskInstance = null;
while(Stopper.isRunning()){
taskInstance = processDao.findTaskInstanceById(this.taskProps.getTaskInstId());
if(taskInstance == null){
exitStatusCode = -1;
break;
}
if(taskInstance.getState() == ExecutionStatus.KILL){
this.cancel = true;
}
if(this.cancel || allDependentTaskFinish()){
break;
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if(cancel){
exitStatusCode = Constants.EXIT_CODE_KILL;
}else{
DependResult result = getTaskDependResult();
exitStatusCode = (result == DependResult.SUCCESS) ?
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = -1;
}
}
-
通过任务实例id,获取当前最新的任务实例信息 -
判断状态是否为kill,是则退出 -
判断所有依赖任务是否完成,是则退出 -
休眠1秒,进入下一次循环。
private boolean allDependentTaskFinish(){
boolean finish = true;
for(DependentExecute dependentExecute : dependentTaskList){
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) {
if(!dependResultMap.containsKey(entry.getKey())){
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
logger.info("dependent item complete {} {},{}",
DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());
}
}
if(!dependentExecute.finish(dependentDate)){
finish = false;
}
}
return finish;
}
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
}else{
this.dependentDate = taskProps.getTaskStartTime();
}
/**
* calculate dependent result for one dependent item.
* @param dependentItem dependent item
* @param dateIntervals date intervals
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
dateInterval);
if(processInstance == null){
logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}",
dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
return DependResult.FAILED;
}
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
result = getDependResultByState(processInstance.getState());
}else{
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.getName().equals(dependentItem.getDepTasks())){
taskInstance = task;
break;
}
}
if(taskInstance == null){
// cannot find task in the process instance
// maybe because process instance is running or failed.
result = getDependResultByState(processInstance.getState());
}else{
result = getDependResultByState(taskInstance.getState());
}
}
if(result != DependResult.SUCCESS){
break;
}
}
return result;
}
-
调用Stopper.stop设置全局变量。停止所有线程的“死”循环 -
休眠3秒 -
停止worker心跳。heartbeatWorkerService.shutdownNow -
停止worker任务线程池。ThreadPoolExecutors.getInstance().shutdown -
停止killExecutor线程池。killExecutorService.shutdownNow -
停止fetchTask线程池。fetchTaskExecutorService.shutdownNow -
停止zookeeper客户端。zkWorkerClient.close
private ZKWorkerClient(){
init();
}
/**
* init
*/
private void init(){
// init system znode
this.initSystemZNode();
// monitor worker
this.listenerWorker();
// register worker
this.registWorker();
}
protected void initSystemZNode(){
try {
createNodePath(getMasterZNodeParentPath());
createNodePath(getWorkerZNodeParentPath());
createNodePath(getDeadZNodeParentPath());
} catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e);
}
}
private void createNodePath(String zNodeParentPath) throws Exception {
if(null == zkClient.checkExists().forPath(zNodeParentPath)){
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
}
}
public String registerServer(ZKNodeType zkNodeType) throws Exception {
String registerPath = null;
String host = OSUtils.getHost();
if(checkZKNodeExists(host, zkNodeType)){
logger.error("register failure , {} server already started on host : {}" ,
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(zkNodeType);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
}
private void listenerWorker(){
workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);
try {
workerPathChildrenCache.start();
workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
logger.info("node added : {}" ,event.getData().getPath());
break;
case CHILD_REMOVED:
String path = event.getData().getPath();
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
case CHILD_UPDATED:
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("monitor worker failed : " + e.getMessage(),e);
}
}
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
zkWorkerClient.setStoppable(this);
private ZKMasterClient(ProcessDao processDao){
this.processDao = processDao;
init();
}
public void init(){
// init dao
this.initDao();
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(zkClient, znodeLock);
mutex.acquire();
// init system znode
this.initSystemZNode();
// monitor master
this.listenerMaster();
// monitor worker
this.listenerWorker();
// register master
this.registerMaster();
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}
}catch (Exception e){
logger.error("master start up exception : " + e.getMessage(),e);
}finally {
releaseMutex(mutex);
}
}
-
initDao。其实就是初始化alertDao,调用DaoFactory.getDaoInstance(AlertDao.class)。好恶心的初始化方法,processDao是传进来的,alertDao又是这样创建的。 -
申请/dolphinscheduler/lock/failover/master路径的分布式锁。 -
申请到锁之后,依次调用initSystemZNode、listenerMaster、listenerWorker、registerMaster -
如果当前活动的master个数为1则进行容灾。暂时还不知道为啥。
private void failoverMaster(String masterHost) {
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
processDao.processNeedFailoverProcessInstances(processInstance);
}
logger.info("master failover end");
}
(rollbackFor = Exception.class)
public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
//1 update processInstance host is null
processInstance.setHost("null");
processInstanceMapper.updateById(processInstance);
//2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
cmd.setCommandParam(String.format("{"%s":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
createCommand(cmd);
}
Command command = new Command();
command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
command.setProcessDefinitionId(schedule.getProcessDefinitionId());
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
command.setWorkerGroupId(schedule.getWorkerGroupId());
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
这部分逻辑最好处理一下啊,要不然就真的永久处于running状态了。
/**
* failover worker tasks
*
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive
* @throws Exception exception
*/
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){
continue;
}
}
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance);
}
logger.info("end worker[{}] failover ...", workerHost);
}
/**
* determine if you can try again
* @return can try result
*/
public boolean taskCanRetry() {
if(this.isSubProcess()){
return false;
}
if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
return true;
}else {
return (this.getState().typeIsFailure()
&& this.getRetryTimes() < this.getMaxRetryTimes());
}
}
taskCanRetry中,如果是ExecutionStatus.NEED_FAULT_TOLERANCE状态,则不管重试了多少次,一定可以重试。有啥用呢?
if(task.getState().typeIsFailure()){
if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
this.recoverToleranceFaultTaskList.add(task);
}
if(task.taskCanRetry()){
addTaskToStandByList(task);
}else{
// node failure, based on failure strategy
errorTaskList.put(task.getName(), task);
completeTaskList.put(task.getName(), task);
if(processInstance.getFailureStrategy() == FailureStrategy.END){
killTheOtherTasks();
}
}
continue;
}
-
worker如果与zookeeper连接超时,则停止心跳,停止获取任务,等待所有任务实例执行结束(正常或失败)并更新数据库状态 -
master如果与zookeeper连接超时,则停止心跳,停止获取流程定义实例,停止调度所有流程定义实例 -
master如果发现某个流程定义实例中的任务实例失败且属于ExecutionStatus.NEED_FAULT_TOLERANCE状态,则重新运行。
以上是关于DolphinScheduler源码分析的主要内容,如果未能解决你的问题,请参考以下文章
一文读懂,硬核 Apache DolphinScheduler3.0 源码解析
Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段
Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段