hera源码剖析:一次任务触发的执行流程
Posted scx_white
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hera源码剖析:一次任务触发的执行流程相关的知识,希望对你有一定的参考价值。
文章目录
在
hera
中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在Master#run
方法(开发中心任务触发接口在Master#debug
)。
这里就讲一下手动执行的任务触发流程
触发任务
在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 hera
操作文档查看,这里以手动恢复为例
当我们点击执行之后,会发送一个请求到后端
work端
首先看下 work
端的堆栈信息
writeAndFlush:28, NettyChannel (com.dfire.core.netty)
writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)
buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)
execute:409, ScheduleOperatorController (com.dfire.controller)
invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)
invoke:204, MethodProxy (org.springframework.cglib.proxy)
invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)
around:72, HeraAspect (com.dfire.config)
//省略部分
通过堆栈信息我们可以看到,在 controller
方法被调用之前会先调用一个通过 AOP
实现的权限校验的方法HeraAspect#around
,当权限校验通过后会继续调用ScheduleOperatorController#execute
方法,该方法就是任务执行的入口。再往后就是调用 WorkerHandleWebRequest#handleWebExecute
和 WorkerHandleWebRequest#buildMessage
方法来创建 netty
消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush
)来向 master
发送一条 netty
消息
下面仔细分析下,controller
入口
@RequestMapping(value = "/manual", method = RequestMethod.GET)
@ResponseBody
@ApiOperation("手动执行接口")
public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId
, @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType,
@RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException
//省略部分校验代码
String configs = heraJob.getConfigs();
//新建hera_action_history 对象,并向mysql插入执行记录
HeraJobHistory actionHistory = HeraJobHistory.builder().build();
actionHistory.setJobId(heraAction.getJobId());
actionHistory.setActionId(heraAction.getId());
actionHistory.setTriggerType(triggerTypeEnum.getId());
actionHistory.setOperator(heraJob.getOwner());
actionHistory.setIllustrate(execUser);
actionHistory.setStatus(StatusEnum.RUNNING.toString());
actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());
actionHistory.setHostGroupId(heraAction.getHostGroupId());
heraJobHistoryService.insert(actionHistory);
heraAction.setScript(heraJob.getScript());
heraAction.setHistoryId(actionHistory.getId());
heraAction.setConfigs(configs);
heraAction.setAuto(heraJob.getAuto());
heraAction.setHostGroupId(heraJob.getHostGroupId());
heraJobActionService.update(heraAction);
//向master 发送任务执行的请求
workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId());
String ownerId = getOwnerId();
if (ownerId == null)
ownerId = "0";
//添加操作记录
addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId);
return new JsonResponse(true, String.valueOf(actionId));
这部分的代码很简单,主要分为三部分
1.创建 hera_action_history
对象,向 mysql
插入任务的执行记录
2.通过 netty
向 master
发送任务执行的消息
3.添加任务执行记录
需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看
public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException
RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
if (response.getStatus() == ResponseStatus.Status.ERROR)
ErrorLog.error(response.getErrorText());
这部分代码调用了 WorkerHandleWebRequest.handleWebExecute
并返回一个future
,通过 future.get
来阻塞我们的请求,继续往下看
public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id)
return buildMessage(WebRequest.newBuilder()
.setRid(AtomicIncrease.getAndIncrement())
.setOperate(WebOperate.ExecuteJob)
.setEk(kind)
.setId(String.valueOf(id))
.build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id);
private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg)
CountDownLatch latch = new CountDownLatch(1);
WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
workContext.getHandler().addListener(responseListener);
Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() ->
latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
if (!responseListener.getReceiveResult())
ErrorLog.error(errorMsg);
workContext.getHandler().removeListener(responseListener);
return responseListener.getWebResponse();
);
try
workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder()
.setKind(SocketMessage.Kind.WEB_REQUEST)
.setBody(request.toByteString())
.build());
SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId =", request.getRid());
catch (RemotingException e)
workContext.getHandler().removeListener(responseListener);
ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e);
return future;
在 handleWebExecute
方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 operator
参数为 WebOperate.ExecuteJob
,id
为hera_action_history
记录的 id
。
然后在 buildMessage
方法中有三个比较关键的代码
1.CountDownLatch latch = new CountDownLatch(1);
该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener
中被释放。
2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
public class WorkResponseListener extends ResponseListenerAdapter
private RpcWebRequest.WebRequest request;
private volatile Boolean receiveResult;
private CountDownLatch latch;
private RpcWebResponse.WebResponse webResponse;
@Override
public void onWebResponse(RpcWebResponse.WebResponse response)
if (request.getRid() == response.getRid())
try
webResponse = response;
receiveResult = true;
catch (Exception e)
ErrorLog.error("work release exception ", e);
finally
latch.countDown();
在 onWebResponse
方法中,当发现request.getRid() == response.getRid()
时会释放锁,并标志 receiveResult
为 true
3.调用 workContext.getServerChannel().writeAndFlush
方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过 workContext
是什么时候设置的 serverChannel
master端
master
接收所有 netty
消息的处理类为 MasterHandler
,也就是说上面work
发送的执行任务请求最终会在MasterHandler#channelRead
被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
SocketMessage socketMessage = (SocketMessage) msg;
Channel channel = ctx.channel();
switch (socketMessage.getKind())
//省略部分代码
case WEB_REQUEST:
final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
switch (webRequest.getOperate())
case ExecuteJob:
completionService.submit(() ->
new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest)));
break;
//省略部分代码
//省略部分代码
MasterHandler
直接把 work
的任务执行请求异步分发给 MasterHandlerWebResponse.handleWebExecute
来处理,并且返回了一个失败重试封装的 channel
public static WebResponse handleWebExecute(MasterContext context, WebRequest request)
if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind)
Long historyId = Long.parseLong(request.getId());
HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId);
HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory);
context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId()));
WebResponse webResponse = WebResponse.newBuilder()
.setRid(request.getRid())
.setOperate(WebOperate.ExecuteJob)
.setStatus(Status.OK)
.build();
TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = ", history.getJobId());
return webResponse;
else if (request.getEk() == ExecuteKind.DebugKind)
Long debugId = Long.parseLong(request.getId());
HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId);
TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId);
context.getMaster().debug(debugHistory);
WebResponse webResponse = WebResponse.newBuilder()
.setRid(request.getRid())
.setOperate(WebOperate.ExecuteJob)
.setStatus(Status.OK)
.build();
TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = ", debugId);
return webResponse;
return WebResponse.newBuilder()
.setRid(request.getRid())
.setErrorText("未识别的操作类型" + request.getEk())
.setStatus(Status.ERROR)
.build();
在这里主要是根据request.getEk()
来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind
,直接看 if
部分代码。
-
首先根据
hera_action_history
的id
来查询在work
端插入的那条记录 -
调用
master#run
方法 -
创建
webResponse
对象,返回执行任务ok
的标志
run方法
public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob)
Long actionId = heraJobHistory.getActionId();
//重复job检测
//1:检测任务是否已经在队列或者正在执行
if (checkJobExists(heraJobHistory, false))
return;
HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);
Set<String> areaList = areaList(heraJob.getAreaId());
//2:非执行区域任务直接设置为成功
if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA))
ScheduleLog.info("非区域任务,直接设置为成功:", HeraGlobalEnv.getArea(), heraJob.getId());
heraAction.setLastResult(heraAction.getStatus());
heraAction.setStatus(StatusEnum.SUCCESS.toString());
heraAction.setHistoryId(heraJobHistory.getId());
heraAction.setReadyDependency("");
String host = "localhost";
heraAction.setHost(host);
Date endTime = new Date();
heraAction.setStatisticStartTime(endTime);
heraAction.setStatisticEndTime(endTime);
masterContext.getHeraJobActionService().update(heraAction);
heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功");
heraJobHistory.setStatusEnum(StatusEnum.SUCCESS);
heraJobHistory.setEndTime(endTime);
heraJobHistory.setStartTime(endTime);
heraJobHistory.setExecuteHost(host);
masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory);
masterContext.getDispatcher().forwardEvent(successEvent);
return;
//3.先在数以上是关于hera源码剖析:一次任务触发的执行流程的主要内容,如果未能解决你的问题,请参考以下文章
LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收
LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收
LevelDB 源码剖析Compaction模块:Minor CompactionMajor Compaction文件选取执行流程垃圾回收