一道任务编排服务面试题解析

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一道任务编排服务面试题解析相关的知识,希望对你有一定的参考价值。

6月份的时候,群里有一个小伙伴分享了一道面试题,如下图:

趁着下班的功夫,拆解下这道面试题

  • 可以按顺序添加任意节点形成一个任务链
    • 链表且有序 或者数组(考虑到数组的扩容,直接pass调,用有序链表LinkedList)
    • 添加任意节点(可以指定索引添加)
  • 任务需要按顺序执行,同一个节点可能有多个任务(可以并行执行)
    • 线程池执行
    • 同一个节点的任务,广度优先,可以利用Queue
  • 节点完成可以指定任意个数
    • Queue需要包装起来
    • 同时需要计数,直接利用CountDownLatch
  • 每个节点都可能失败,失败后可以通过接口进行重试
    • 持久化保存节点任务执行状态
    • 如果强依赖,必须按顺序再走一遍,成功的可以忽略,只执行失败的
    • 只将失败任务重新串联起来
  • 实现监控功能,可以监控当前任务状态
    • 最简单的日志输出
    • 如果有kibana,直接filebeat抽取日志到es,然后在kibana里配置视图,或者定时从es里抽取失败任务,做钉钉或短信报警
    • 也可以filebeat抽取到kafka,然后消费处理,报警
  • 隐含的,还有一层,任务
    • 可以抽象出来,业务自定义去实现

ok,分析完了,开干。

定义任务并编写执行示例

/**
 * 任务接口
 * @Author: yxkong
 * @Date: 2022/6/1 6:24 PM
 * @version: 1.0
 */
public interface ITask 

    /**
     * 任务执行入口
     * @return
     */
    TaskResult execute();



/**
 * 任务示例,具体的任务实现
 * @Author: yxkong
 * @Date: 2022/6/1 6:55 PM
 * @version: 1.0
 */
public class TaskExample implements ITask
    private static Random random = new Random();
    //业务数据
    private String name;
    private int status;
    @Override
    public TaskResult execute() 
        System.out.println(String.format("%s task name:%s starting",Thread.currentThread().getName(), this.name));
        //TODO 执行具体的业务逻辑
        try 
            //添加随机等待,确保同一层级不使用一个线程
            Thread.sleep(Long.valueOf(random.nextInt(500)));
         catch (InterruptedException e) 
            e.printStackTrace();
        
        if (status == 1)
            System.out.println(String.format("%s task name:%s 任务执行成功",Thread.currentThread().getName(), this.name));
            return new TaskResult(1,"任务执行成功");
         else 
            System.out.println(String.format("%s task name:%s 任务执行失败",Thread.currentThread().getName(), this.name));
            //模拟失败后,再次要执行的任务是成功的(正常,直接添加即可)
            this.status = 1;
            return new TaskResult(0,"任务执行失败!",this);
        

    
    public TaskExample(String name, int status) 
        this.name = name;
        this.status = status;
    

    @Override
    public String toString() 
        return "ITask" +
                "name='" + name + '\\'' +
                ", status=" + status +
                '';
    

定义任务编排链路

先定义一个任务包装体

/**
 * 任务包装体
 * @Author: yxkong
 * @Date: 2022/6/1 6:37 PM
 * @version: 1.0
 */
public class TaskQueueWrap 
    private Queue<ITask> queue ;
    //至少完成个数
    private int atLeast;

    private TaskQueueWrap(int atLeast) 
        this.queue = new ArrayDeque<>();
        this.atLeast = atLeast;
    

    public Queue<ITask> getQueue() 
        return queue;
    

    public int getAtLeast() 
        return atLeast;
    
    public boolean add(ITask task)
        return this.queue.add(task);
    
    public static TaskQueueWrap create(List<ITask> tasks,int atLeast)
        TaskQueueWrap queue = new TaskQueueWrap(atLeast);
        for (ITask task:tasks)
            queue.add(task);
        
        return queue;
    
    public static TaskQueueWrap createEmpty(int atLeast)
        return new TaskQueueWrap(atLeast);
    


定义任务链,并暴露各种操作方法,包括

  • addNew(...)添加新层级任务(单个或批量)
  • addLast(...)在最后一个层级上添加任务
  • addIdx(...) 指定层级添加任务
  • execute() 链路执行
/**
 * 任务链
 * @Author: yxkong
 * @Date: 2022/6/1 6:34 PM
 * @version: 1.0
 */
public class TaskChain 
    private LinkedList<TaskQueueWrap> chain;
    private LinkedList<TaskCounter> counters;

    public TaskChain() 
        this.chain = new LinkedList<>();
        this.counters = new LinkedList<>();
    

    /**
     * 在最后一个层级上添加任务
     * @param task
     * @return
     */
    public boolean addLast(ITask task)
        TaskQueueWrap queue = chain.getLast();
        if (Objects.isNull(task) || Objects.isNull(queue))
            return Boolean.FALSE;
        
        queue.add(task);
        return Boolean.TRUE;
    

    /**
     * 添加新层级任务
     * @param task
     * @return
     */
    public boolean addNew(ITask task, int atLeast)
        if (Objects.isNull(task))
            return Boolean.FALSE;
        
        TaskQueueWrap queue =TaskQueueWrap.createEmpty(atLeast);
        queue.add(task);
        this.chain.addLast(queue);
        return Boolean.TRUE;
    
    /**
     * 添加新层级任务
     * @param tasks
     * @return
     */
    public boolean addNew(List<ITask> tasks, int atLeast)
        if (Objects.isNull(tasks))
            return Boolean.FALSE;
        
        TaskQueueWrap queue = TaskQueueWrap.create(tasks,atLeast);
        this.chain.add(queue);
        return Boolean.TRUE;
    
    private boolean add(TaskQueueWrap taskQueue) 
        if (Objects.isNull(taskQueue))
            return Boolean.FALSE;
        
        this.chain.add(taskQueue);
        return Boolean.TRUE;
    

    /**
     * 指定层级添加任务
     * @param task
     * @param idx
     * @return
     */
    public boolean addIdx(ITask task,int idx)
        if (Objects.isNull(task))
            return Boolean.FALSE;
        
        if (idx > this.chain.size())
            return Boolean.FALSE;
        
        if (idx == this.chain.size())
           return  this.addNew(task,1);
        
        this.chain.get(idx).add(task);
        return Boolean.TRUE;
    

    public TaskChain execute() 
        //广度优先遍历
        ThreadPoolExecutor executor = ThreadPoolUtils.getThreadPooll();
        TaskChain failChain = new TaskChain();
        //埋点1,任务开始执行
        System.out.println("task starting");
        for (int i = 0; i < this.chain.size(); i++) 
            //层级开始埋点
            System.out.println(String.format("task level:%d starting", (i+1)));
            TaskQueueWrap queueWrap = this.chain.get(i);
            Queue<ITask> queue = queueWrap.getQueue();
            int queueSize = queue.size();
            CountDownLatch count = new CountDownLatch(queueSize);
            List<Future<TaskResult>> list = new ArrayList<>();
            while (queue.size() != 0)
                TaskThread taskThread = new TaskThread(queue.poll(),count);
                list.add(executor.submit(taskThread));
            
            try 
                count.await(5, TimeUnit.SECONDS);


             catch (InterruptedException e) 
                e.printStackTrace();
            
            List<ITask> failTasks = counterAndFail(queueSize, list);
            failChain.addNew(failTasks,queueWrap.getAtLeast());
            //不满足至少,任务链结束
            TaskCounter taskCounter = this.counters.getLast();
            if (queueWrap.getAtLeast() > (taskCounter.getCounter()- taskCounter.getFailCounter()))
                //将剩余的任务添加进去
                for (int j = i+1; j < this.chain.size(); j++) 
                    failChain.add(this.chain.get(j));
                
                return failChain;
            

            //层级结束埋点
            System.out.println(String.format("task level:%d end", (i+1)));
        

        System.out.println("task end");
        return failChain;
    



    private List<ITask> counterAndFail(int size, List<Future<TaskResult>> list)
        List<ITask> failTasks = new ArrayList<>();
        try 
            //计数
            Integer failCounter = 0;
            for (Future<TaskResult> f:list)
                TaskResult result = f.get();
                if (!result.isSuc())
                    failCounter++;
                    ITask task =(ITask) result.getData();
                    failTasks.add(task);
                
            
            counters.add(new TaskCounter(size,failCounter));

         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
        
        return failTasks;
    

同时定义了一个返回体

/**
 * 任务返回状态
 * @Author: yxkong
 * @Date: 2022/6/1 6:18 PM
 * @version: 1.0
 */
public class TaskResult 
    //任务状态,1成功,0失败,失败的时候,data为原任务
    private int status;
    private String msg;
    //任务数据
    private Object data;

    public TaskResult(int status, String msg, Object data) 
        this.status = status;
        this.msg = msg;
        this.data = data;
    

    public TaskResult(int status, String msg) 
        this.status = status;
        this.msg = msg;
    

    public int getStatus() 
        return status;
    
    public boolean isSuc()
        if (status == 1)
            return Boolean.TRUE;
        
        return Boolean.FALSE;
    

    public String getMsg() 
        return msg;
    

    public Object getData() 
        return data;
    

计数器

/**
 * 任务执行计数器
 *   如果放在执行线程里,需要用AtomicInteger
 *   如果在主线程里,用这个即可
 * @Author: yxkong
 * @Date: 2022/6/2 9:12 AM
 * @version: 1.0
 */
public class TaskCounter 

    private int counter;
    private int failCounter;

    public TaskCounter(int counter, int failCounter) 
        this.counter = counter;
        this.failCounter = failCounter;
    

    public int getCounter() 
        return counter;
    

    public void setCounter(int counter) 
        this.counter = counter;
    

    public int getFailCounter() 
        return failCounter;
    

    public void setFailCounter(int failCounter) 
        this.failCounter = failCounter;
    

定义任务线程

/**
 * 任务线程
 * @Author: yxkong
 * @Date: 2022/6/1 6:32 PM
 * @version: 1.0
 */
public class TaskThread implements Callable 
    private ITask task;
    private CountDownLatch downLatch;

    public TaskThread(ITask task, CountDownLatch downLatch) 
        this.task = task;
        this.downLatch = downLatch;
    
    @Override
    public Object call() throws Exception 
        TaskResult rst = new TaskResult(0,"执行失败!");
        try 
            rst = task.execute();
         catch (Exception e) 
            e.printStackTrace();
        finally 
            this.downLatch.countDown();
        
        return rst;
    

测试

/**
 * 测试类
 * @Author: yxkong
 * @Date: 2022/6/1 6:40 PM
 * @version: 1.0
 */
public class TaskTest 
    public static void main(String[] args) 
         /**
         *  任务链路的构建,可以直接从数据库里读取配置,然后构建成链
         *  设计三张表,
         *    一张表存任务链,描述这个任务链
         *    第二张表,存储对应任务链的层级,以及对应层级的至少执行个数
         *    第三张表,存储具体层级的任务,任务执行结果,执行时间,最后更新时间
         *  可以再加辅助表,用于任务执行记录
         */
        TaskChain chain = new TaskChain();
        
        
        //第一层级添加任务
        chain.addNew以上是关于一道任务编排服务面试题解析的主要内容,如果未能解决你的问题,请参考以下文章

一道任务编排服务面试题解析

一道面试题了解计算机网络

高频面试题:JavaScript事件循环机制解析

Java架构面试必知必会的微服务面试题解析

记一道字节跳动的算法面试题

一道微软面试题的运算过程解析