一道任务编排服务面试题解析
Posted 5ycode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一道任务编排服务面试题解析相关的知识,希望对你有一定的参考价值。
6月份的时候,群里有一个小伙伴分享了一道面试题,如下图:
趁着下班的功夫,拆解下这道面试题
- 可以按顺序添加任意节点形成一个任务链
- 链表且有序 或者数组(考虑到数组的扩容,直接pass调,用有序链表LinkedList)
- 添加任意节点(可以指定索引添加)
- 任务需要按顺序执行,同一个节点可能有多个任务(可以并行执行)
- 线程池执行
- 同一个节点的任务,广度优先,可以利用Queue
- 节点完成可以指定任意个数
- Queue需要包装起来
- 同时需要计数,直接利用CountDownLatch
- 每个节点都可能失败,失败后可以通过接口进行重试
- 持久化保存节点任务执行状态
- 如果强依赖,必须按顺序再走一遍,成功的可以忽略,只执行失败的
- 只将失败任务重新串联起来
- 实现监控功能,可以监控当前任务状态
- 最简单的日志输出
- 如果有kibana,直接filebeat抽取日志到es,然后在kibana里配置视图,或者定时从es里抽取失败任务,做钉钉或短信报警
- 也可以filebeat抽取到kafka,然后消费处理,报警
- 隐含的,还有一层,任务
- 可以抽象出来,业务自定义去实现
ok,分析完了,开干。
定义任务并编写执行示例
/**
* 任务接口
* @Author: yxkong
* @Date: 2022/6/1 6:24 PM
* @version: 1.0
*/
public interface ITask
/**
* 任务执行入口
* @return
*/
TaskResult execute();
/**
* 任务示例,具体的任务实现
* @Author: yxkong
* @Date: 2022/6/1 6:55 PM
* @version: 1.0
*/
public class TaskExample implements ITask
private static Random random = new Random();
//业务数据
private String name;
private int status;
@Override
public TaskResult execute()
System.out.println(String.format("%s task name:%s starting",Thread.currentThread().getName(), this.name));
//TODO 执行具体的业务逻辑
try
//添加随机等待,确保同一层级不使用一个线程
Thread.sleep(Long.valueOf(random.nextInt(500)));
catch (InterruptedException e)
e.printStackTrace();
if (status == 1)
System.out.println(String.format("%s task name:%s 任务执行成功",Thread.currentThread().getName(), this.name));
return new TaskResult(1,"任务执行成功");
else
System.out.println(String.format("%s task name:%s 任务执行失败",Thread.currentThread().getName(), this.name));
//模拟失败后,再次要执行的任务是成功的(正常,直接添加即可)
this.status = 1;
return new TaskResult(0,"任务执行失败!",this);
public TaskExample(String name, int status)
this.name = name;
this.status = status;
@Override
public String toString()
return "ITask" +
"name='" + name + '\\'' +
", status=" + status +
'';
定义任务编排链路
先定义一个任务包装体
/**
* 任务包装体
* @Author: yxkong
* @Date: 2022/6/1 6:37 PM
* @version: 1.0
*/
public class TaskQueueWrap
private Queue<ITask> queue ;
//至少完成个数
private int atLeast;
private TaskQueueWrap(int atLeast)
this.queue = new ArrayDeque<>();
this.atLeast = atLeast;
public Queue<ITask> getQueue()
return queue;
public int getAtLeast()
return atLeast;
public boolean add(ITask task)
return this.queue.add(task);
public static TaskQueueWrap create(List<ITask> tasks,int atLeast)
TaskQueueWrap queue = new TaskQueueWrap(atLeast);
for (ITask task:tasks)
queue.add(task);
return queue;
public static TaskQueueWrap createEmpty(int atLeast)
return new TaskQueueWrap(atLeast);
定义任务链,并暴露各种操作方法,包括
addNew(...)
添加新层级任务(单个或批量)addLast(...)
在最后一个层级上添加任务addIdx(...)
指定层级添加任务execute()
链路执行
/**
* 任务链
* @Author: yxkong
* @Date: 2022/6/1 6:34 PM
* @version: 1.0
*/
public class TaskChain
private LinkedList<TaskQueueWrap> chain;
private LinkedList<TaskCounter> counters;
public TaskChain()
this.chain = new LinkedList<>();
this.counters = new LinkedList<>();
/**
* 在最后一个层级上添加任务
* @param task
* @return
*/
public boolean addLast(ITask task)
TaskQueueWrap queue = chain.getLast();
if (Objects.isNull(task) || Objects.isNull(queue))
return Boolean.FALSE;
queue.add(task);
return Boolean.TRUE;
/**
* 添加新层级任务
* @param task
* @return
*/
public boolean addNew(ITask task, int atLeast)
if (Objects.isNull(task))
return Boolean.FALSE;
TaskQueueWrap queue =TaskQueueWrap.createEmpty(atLeast);
queue.add(task);
this.chain.addLast(queue);
return Boolean.TRUE;
/**
* 添加新层级任务
* @param tasks
* @return
*/
public boolean addNew(List<ITask> tasks, int atLeast)
if (Objects.isNull(tasks))
return Boolean.FALSE;
TaskQueueWrap queue = TaskQueueWrap.create(tasks,atLeast);
this.chain.add(queue);
return Boolean.TRUE;
private boolean add(TaskQueueWrap taskQueue)
if (Objects.isNull(taskQueue))
return Boolean.FALSE;
this.chain.add(taskQueue);
return Boolean.TRUE;
/**
* 指定层级添加任务
* @param task
* @param idx
* @return
*/
public boolean addIdx(ITask task,int idx)
if (Objects.isNull(task))
return Boolean.FALSE;
if (idx > this.chain.size())
return Boolean.FALSE;
if (idx == this.chain.size())
return this.addNew(task,1);
this.chain.get(idx).add(task);
return Boolean.TRUE;
public TaskChain execute()
//广度优先遍历
ThreadPoolExecutor executor = ThreadPoolUtils.getThreadPooll();
TaskChain failChain = new TaskChain();
//埋点1,任务开始执行
System.out.println("task starting");
for (int i = 0; i < this.chain.size(); i++)
//层级开始埋点
System.out.println(String.format("task level:%d starting", (i+1)));
TaskQueueWrap queueWrap = this.chain.get(i);
Queue<ITask> queue = queueWrap.getQueue();
int queueSize = queue.size();
CountDownLatch count = new CountDownLatch(queueSize);
List<Future<TaskResult>> list = new ArrayList<>();
while (queue.size() != 0)
TaskThread taskThread = new TaskThread(queue.poll(),count);
list.add(executor.submit(taskThread));
try
count.await(5, TimeUnit.SECONDS);
catch (InterruptedException e)
e.printStackTrace();
List<ITask> failTasks = counterAndFail(queueSize, list);
failChain.addNew(failTasks,queueWrap.getAtLeast());
//不满足至少,任务链结束
TaskCounter taskCounter = this.counters.getLast();
if (queueWrap.getAtLeast() > (taskCounter.getCounter()- taskCounter.getFailCounter()))
//将剩余的任务添加进去
for (int j = i+1; j < this.chain.size(); j++)
failChain.add(this.chain.get(j));
return failChain;
//层级结束埋点
System.out.println(String.format("task level:%d end", (i+1)));
System.out.println("task end");
return failChain;
private List<ITask> counterAndFail(int size, List<Future<TaskResult>> list)
List<ITask> failTasks = new ArrayList<>();
try
//计数
Integer failCounter = 0;
for (Future<TaskResult> f:list)
TaskResult result = f.get();
if (!result.isSuc())
failCounter++;
ITask task =(ITask) result.getData();
failTasks.add(task);
counters.add(new TaskCounter(size,failCounter));
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
return failTasks;
同时定义了一个返回体
/**
* 任务返回状态
* @Author: yxkong
* @Date: 2022/6/1 6:18 PM
* @version: 1.0
*/
public class TaskResult
//任务状态,1成功,0失败,失败的时候,data为原任务
private int status;
private String msg;
//任务数据
private Object data;
public TaskResult(int status, String msg, Object data)
this.status = status;
this.msg = msg;
this.data = data;
public TaskResult(int status, String msg)
this.status = status;
this.msg = msg;
public int getStatus()
return status;
public boolean isSuc()
if (status == 1)
return Boolean.TRUE;
return Boolean.FALSE;
public String getMsg()
return msg;
public Object getData()
return data;
计数器
/**
* 任务执行计数器
* 如果放在执行线程里,需要用AtomicInteger
* 如果在主线程里,用这个即可
* @Author: yxkong
* @Date: 2022/6/2 9:12 AM
* @version: 1.0
*/
public class TaskCounter
private int counter;
private int failCounter;
public TaskCounter(int counter, int failCounter)
this.counter = counter;
this.failCounter = failCounter;
public int getCounter()
return counter;
public void setCounter(int counter)
this.counter = counter;
public int getFailCounter()
return failCounter;
public void setFailCounter(int failCounter)
this.failCounter = failCounter;
定义任务线程
/**
* 任务线程
* @Author: yxkong
* @Date: 2022/6/1 6:32 PM
* @version: 1.0
*/
public class TaskThread implements Callable
private ITask task;
private CountDownLatch downLatch;
public TaskThread(ITask task, CountDownLatch downLatch)
this.task = task;
this.downLatch = downLatch;
@Override
public Object call() throws Exception
TaskResult rst = new TaskResult(0,"执行失败!");
try
rst = task.execute();
catch (Exception e)
e.printStackTrace();
finally
this.downLatch.countDown();
return rst;
测试
/**
* 测试类
* @Author: yxkong
* @Date: 2022/6/1 6:40 PM
* @version: 1.0
*/
public class TaskTest
public static void main(String[] args)
/**
* 任务链路的构建,可以直接从数据库里读取配置,然后构建成链
* 设计三张表,
* 一张表存任务链,描述这个任务链
* 第二张表,存储对应任务链的层级,以及对应层级的至少执行个数
* 第三张表,存储具体层级的任务,任务执行结果,执行时间,最后更新时间
* 可以再加辅助表,用于任务执行记录
*/
TaskChain chain = new TaskChain();
//第一层级添加任务
chain.addNew以上是关于一道任务编排服务面试题解析的主要内容,如果未能解决你的问题,请参考以下文章