hera源码剖析:一次任务触发的执行流程

Posted scx_white

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hera源码剖析:一次任务触发的执行流程相关的知识,希望对你有一定的参考价值。

文章目录

hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发接口在 Master#debug )。

这里就讲一下手动执行的任务触发流程

触发任务

在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 hera 操作文档查看,这里以手动恢复为例

当我们点击执行之后,会发送一个请求到后端

work端

首先看下 work 端的堆栈信息

writeAndFlush:28, NettyChannel (com.dfire.core.netty)
writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)
buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)
execute:409, ScheduleOperatorController (com.dfire.controller)
invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)
invoke:204, MethodProxy (org.springframework.cglib.proxy)
invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)
around:72, HeraAspect (com.dfire.config)
//省略部分

通过堆栈信息我们可以看到,在 controller 方法被调用之前会先调用一个通过 AOP 实现的权限校验的方法HeraAspect#around,当权限校验通过后会继续调用ScheduleOperatorController#execute 方法,该方法就是任务执行的入口。再往后就是调用 WorkerHandleWebRequest#handleWebExecuteWorkerHandleWebRequest#buildMessage 方法来创建 netty 消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush)来向 master 发送一条 netty 消息


下面仔细分析下,controller 入口

    @RequestMapping(value = "/manual", method = RequestMethod.GET)
    @ResponseBody
    @ApiOperation("手动执行接口")
    public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId
            , @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType,
                                @RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException 
  		//省略部分校验代码
        String configs = heraJob.getConfigs();
        //新建hera_action_history 对象,并向mysql插入执行记录
        HeraJobHistory actionHistory = HeraJobHistory.builder().build();
        actionHistory.setJobId(heraAction.getJobId());
        actionHistory.setActionId(heraAction.getId());
        actionHistory.setTriggerType(triggerTypeEnum.getId());
        actionHistory.setOperator(heraJob.getOwner());
        actionHistory.setIllustrate(execUser);
        actionHistory.setStatus(StatusEnum.RUNNING.toString());
        actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());
        actionHistory.setHostGroupId(heraAction.getHostGroupId());
        heraJobHistoryService.insert(actionHistory);
        heraAction.setScript(heraJob.getScript());
        heraAction.setHistoryId(actionHistory.getId());
        heraAction.setConfigs(configs);
        heraAction.setAuto(heraJob.getAuto());
        heraAction.setHostGroupId(heraJob.getHostGroupId());
        heraJobActionService.update(heraAction);
        //向master 发送任务执行的请求
        workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId());

        String ownerId = getOwnerId();
        if (ownerId == null) 
            ownerId = "0";
        
        //添加操作记录
        addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId);
        return new JsonResponse(true, String.valueOf(actionId));
    

这部分的代码很简单,主要分为三部分
1.创建 hera_action_history 对象,向 mysql 插入任务的执行记录
2.通过 nettymaster 发送任务执行的消息
3.添加任务执行记录

需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看

    public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException 
        RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
        if (response.getStatus() == ResponseStatus.Status.ERROR) 
            ErrorLog.error(response.getErrorText());
        
    

这部分代码调用了 WorkerHandleWebRequest.handleWebExecute 并返回一个future,通过 future.get 来阻塞我们的请求,继续往下看

    public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) 
        return buildMessage(WebRequest.newBuilder()
                .setRid(AtomicIncrease.getAndIncrement())
                .setOperate(WebOperate.ExecuteJob)
                .setEk(kind)
                .setId(String.valueOf(id))
                .build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id);
    
     private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg) 
        CountDownLatch latch = new CountDownLatch(1);
        WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
        workContext.getHandler().addListener(responseListener);
        Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() -> 
            latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
            if (!responseListener.getReceiveResult()) 
                ErrorLog.error(errorMsg);
            
            workContext.getHandler().removeListener(responseListener);
            return responseListener.getWebResponse();
        );
        try 
            workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder()
                    .setKind(SocketMessage.Kind.WEB_REQUEST)
                    .setBody(request.toByteString())
                    .build());
            SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId =", request.getRid());
         catch (RemotingException e) 
            workContext.getHandler().removeListener(responseListener);
            ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e);
        
        return future;
    

handleWebExecute 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 operator 参数为 WebOperate.ExecuteJobidhera_action_history 记录的 id
然后在 buildMessage 方法中有三个比较关键的代码
1.CountDownLatch latch = new CountDownLatch(1); 该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener 中被释放。
2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);

public class WorkResponseListener extends ResponseListenerAdapter 


    private RpcWebRequest.WebRequest request;
    private volatile Boolean receiveResult;
    private CountDownLatch latch;
    private RpcWebResponse.WebResponse webResponse;

    @Override
    public void onWebResponse(RpcWebResponse.WebResponse response) 
        if (request.getRid() == response.getRid()) 
            try 
                webResponse = response;
                receiveResult = true;
             catch (Exception e) 
                ErrorLog.error("work release exception ", e);
             finally 
                latch.countDown();
            
        
    

onWebResponse 方法中,当发现request.getRid() == response.getRid()时会释放锁,并标志 receiveResulttrue
3.调用 workContext.getServerChannel().writeAndFlush 方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过 workContext 是什么时候设置的 serverChannel

master端

master 接收所有 netty 消息的处理类为 MasterHandler,也就是说上面work 发送的执行任务请求最终会在MasterHandler#channelRead被调用

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        SocketMessage socketMessage = (SocketMessage) msg;
        Channel channel = ctx.channel();
        switch (socketMessage.getKind()) 
			//省略部分代码
            case WEB_REQUEST:
                final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
                switch (webRequest.getOperate()) 
                    case ExecuteJob:
                        completionService.submit(() ->
                                new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest)));
                        break;
                        //省略部分代码
                
            //省略部分代码
        

    

MasterHandler 直接把 work 的任务执行请求异步分发给 MasterHandlerWebResponse.handleWebExecute 来处理,并且返回了一个失败重试封装的 channel

  public static WebResponse handleWebExecute(MasterContext context, WebRequest request) 
        if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) 
            Long historyId = Long.parseLong(request.getId());
            HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId);
            HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory);
            context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId()));
            WebResponse webResponse = WebResponse.newBuilder()
                    .setRid(request.getRid())
                    .setOperate(WebOperate.ExecuteJob)
                    .setStatus(Status.OK)
                    .build();
            TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId =  ", history.getJobId());
            return webResponse;
         else if (request.getEk() == ExecuteKind.DebugKind) 
            Long debugId = Long.parseLong(request.getId());
            HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId);
            TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId);
            context.getMaster().debug(debugHistory);

            WebResponse webResponse = WebResponse.newBuilder()
                    .setRid(request.getRid())
                    .setOperate(WebOperate.ExecuteJob)
                    .setStatus(Status.OK)
                    .build();
            TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = ", debugId);
            return webResponse;
        
        return WebResponse.newBuilder()
                .setRid(request.getRid())
                .setErrorText("未识别的操作类型" + request.getEk())
                .setStatus(Status.ERROR)
                .build();
    

在这里主要是根据request.getEk() 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind,直接看 if 部分代码。

  • 首先根据 hera_action_historyid 来查询在 work 端插入的那条记录

  • 调用 master#run 方法

  • 创建 webResponse 对象,返回执行任务 ok 的标志

run方法

 public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) 
        Long actionId = heraJobHistory.getActionId();
        //重复job检测
        //1:检测任务是否已经在队列或者正在执行
        if (checkJobExists(heraJobHistory, false)) 
            return;
        
        HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);
        Set<String> areaList = areaList(heraJob.getAreaId());
        //2:非执行区域任务直接设置为成功
        if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) 
            ScheduleLog.info("非区域任务,直接设置为成功:", HeraGlobalEnv.getArea(), heraJob.getId());
            heraAction.setLastResult(heraAction.getStatus());
            heraAction.setStatus(StatusEnum.SUCCESS.toString());
            heraAction.setHistoryId(heraJobHistory.getId());
            heraAction.setReadyDependency("");
            String host = "localhost";
            heraAction.setHost(host);
            Date endTime = new Date();
            heraAction.setStatisticStartTime(endTime);
            heraAction.setStatisticEndTime(endTime);
            masterContext.getHeraJobActionService().update(heraAction);
            heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功");
            heraJobHistory.setStatusEnum(StatusEnum.SUCCESS);
            heraJobHistory.setEndTime(endTime);
            heraJobHistory.setStartTime(endTime);
            heraJobHistory.setExecuteHost(host);
            masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
            HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory);
            masterContext.getDispatcher().forwardEvent(successEvent);
            return;
        

        //3.先在数

以上是关于hera源码剖析:一次任务触发的执行流程的主要内容,如果未能解决你的问题,请参考以下文章

LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收

LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收

LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收

Flask源码流程剖析

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理

线程池原理剖析