异步任务串行解决方案

Posted Androider_Zxg

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步任务串行解决方案相关的知识,希望对你有一定的参考价值。

问题描述

在业务开发时,有时不仅仅是拉取一个数据接口展示列表这么简单。举一个购买场景:

  • 第一步调用网络接口登录
  • 第二步调用网络接口购买
  • 第三步查询购买结果
  • 第四步调用DBApi将购买结果写入本地
  • 第五步向外回调结果

这里所有的操作都是异步的,再举一个聊天业务场景的例子。当收到有新消息通知。需要拉取获取新消息的网络接口以获得新消息。有这样几步:

  • 拉取本地DBApi查询本地保持的最大消息Id,以此作为查询新消息的参数
  • 拉取查询新消息的网络接口
  • 将查询到的消息数据保存在DB
  • 保存后向外回调

这里每一步都是异步的,同时上一步的结果将作为下一步的入参。这就是典型的异步任务串行的场景。其实Rxjava提供了这种场景的解决方案,今天我们试着自己写一套解决该问题的方案。

思考

方案一

这里说到底就是一个任务分发的问题,我们可以建立一个TaskCenter。在每一个异步任务回调中,将异步任务的结果和标记自身的tag。转发给TaskCenter,TaskCenter根据tag和结果来创建下一个异步任务。并执行。这种方案可能是我们最容易想到的方案,也是代码实现上最简单的方案。类似于中介者模式。但是这种方式也有如下缺点

  • TaskCenter中承载了所有业务的逻辑,像是一团线中打的结。所有的任务都需要通过它寻找下一个任务节点
  • Task类过于多,每一个任务,我们都需要写一个类。来承载他的逻辑。当业务复杂时,需要创建很多Task类

图示如下:

方案二

我们上述提到,每一个任务就是一个节点,一个节点执行完去寻找下一个节点。那么我们可以想到一种常用的数据结构:链表。每一个业务场景都是一个任务链表,一个节点一个节点执行。直到执行到链表的尾端。当我们编写一个业务场景时,首先确立好有哪几个步骤,接着创建这样一个链表,最后开始执行。优点:

  • 每个业务场景都是一个任务链,各场景间不会有代码逻辑上耦合
  • 代码可读性增强,很清晰的能看到业务场景的每一步都是做什么

图示如下

代码设计

这里我们实现方案二,我们需要解决如下几个问题,代码也就写出来了

  • 链表建立及节点插入
  • 启动链表执行第一个节点任务
  • 任务结果传递给下一个节点并启动节点任务
  • 线程调度

这里我们给出设计类图,再依次介绍每个类的作用

AsyncTaskNode

任务节点,其中会维护下一节点的指针,并定义抽象方法doLogic()由具体任务实现
。action方法提供给任务链AsyncTaskChain调用,action方法负责将doLogic()抛给线程调度器处理或直接执行doLogic方法

AsyncTaskChain

任务链,其中维护了链表的头结点、尾节点及当前节点。提供插入节点的doNext()方法及移动当前节点指针的goNext()方法。goNext()方法中会调用节点的action方法并将当前指针向前移动

BaseCallback

实现Callback接口中onResult()方法,其中引用任务链。在onResult()方法中调用任务链的doNext()方法向前移动指针。

SingleThreadExexutor

封装线程及队列,提供schedule()方法向任务队列里面插入任务。作为线程调度器。

代码实现

这里我直接贴出代码实现

AsyncTaskNode

public abstract class AsyncTaskNode implements SingleThreadExecutor.Callable 
    public String tag;
    protected AsyncTaskNode next;

    private SingleThreadExecutor executor;
    private Object param;

    public AsyncTaskNode(String tag) 
        this.tag = tag;
    

    public AsyncTaskNode(String tag, SingleThreadExecutor executor) 
        this.tag = tag;
        this.executor = executor;
    

    abstract void doLogic(Object o);

    protected void action(Object o) 
        this.param = o;
        if (executor != null) 
            //若线程调度器不为空,将任务交给调度器调度执行
            executor.schedule(this);
         else 
            //否则直接执行逻辑
            doLogic(o);
        
    

    public Object getParam() 
        return param;
    

    @Override
    public void call() 
        doLogic(param);
    

AsyncTaskChain

public class AsyncTaskChain 
    public AsyncTaskNode head;
    public AsyncTaskNode tail;
    public AsyncTaskNode current;

    public void startTask() 
        current = head.next;
        if (head != null) 
            head.action(null);
        
    

    public AsyncTaskChain doNext(AsyncTaskNode task) 
        if (head == null) 
            head = task;
            tail = task;
            current = head;
         else 
            tail.next = task;
            tail = tail.next;
        
        return this;
    

    public void goNext(Object o) 
        if (current != null) 
            System.out.println("goNext current:" + current.tag + "current thread:" + Thread.currentThread().getName());
            current.action(o);
            current = current.next;
        

    

SingleThreadExecutor

public class SingleThreadExecutor 
    private String name;
    private Thread thread;
    private LinkedBlockingQueue<Callable> blockingQueue;
    private AtomicBoolean stopFlag;


    public SingleThreadExecutor(String name) 
        this.name = name;
        stopFlag = new AtomicBoolean();
        stopFlag.set(false);
        blockingQueue = new LinkedBlockingQueue<>();
        thread = new Thread(this::loopQueue);
        thread.setName(name);
        thread.start();
    

    private void loopQueue() 
        while (blockingQueue != null && !stopFlag.get() && thread != null && (!thread.isInterrupted())) 
            Callable task = null;
            try 
                task = blockingQueue.take();
                task.call();
             catch (InterruptedException e) 
                e.printStackTrace();
            

        
    

    public void schedule(Callable callable) 
        if (blockingQueue != null && (!stopFlag.get())) 
            blockingQueue.offer(callable);
        
    

    public void destroy() 
        stopFlag.set(true);
        blockingQueue.clear();
        thread.interrupt();
        thread = null;
    

    public interface Callable 
        void call();
    

Callback

public interface Callback 
    void onResult(Object o);

BaseCallback

public class BaseCallback implements Callback 
    private AsyncTaskChain chain;

    public BaseCallback(AsyncTaskChain chain) 
        this.chain = chain;
    

    @Override
    public void onResult(Object o) 
        if (chain != null) 
            chain.goNext(o);
        
    

测试代码

NetMission

该类提供模拟异步的网络方法

public class NetMission 
    public static void doLogin(String phone, Callback callBack) 
        new Thread(new Runnable() 
            @Override
            public void run() 
                System.out.println("doLogin: params = [" + phone + "]");
                callBack.onResult("login result");
            
        ).start();

    

    public static void doBuy(String str, Callback callBack) 
        new Thread(new Runnable() 
            @Override
            public void run() 
                System.out.println("doBuy params = [" + str + "]");
                callBack.onResult("dobuy result");
            
        ).start();

    

    public static void queryResult(String str, Callback callBack) 
        new Thread(new Runnable() 
            @Override
            public void run() 
                System.out.println("doQueryResult params = [" + str + "]");
                callBack.onResult("queryResult result");
            
        ).start();

    

测试类

public class Test 
    public static void main(String[] args) 
        //不使用线程调度器
        AsyncTaskChain taskChain1 = new AsyncTaskChain();
        taskChain1
                .doNext(new AsyncTaskNode("LoginTask") 
                    @Override
                    void doLogic(Object o) 
                        NetMission.doLogin("login params", new BaseCallback(taskChain1));
                    
                )
                .doNext(new AsyncTaskNode("BuyTask") 
                    @Override
                    void doLogic(Object o) 
                        NetMission.doBuy((String) o, new BaseCallback(taskChain1));
                    
                )
                .doNext(new AsyncTaskNode("third") 
                    @Override
                    void doLogic(Object o) 
                        NetMission.queryResult((String) o, new BaseCallback(taskChain1));
                    
                ).startTask();

        //使用线程调度器
        SingleThreadExecutor businessQueue = new SingleThreadExecutor("BusinessQueue");
        SingleThreadExecutor callBackQueue = new SingleThreadExecutor("CallbackQueue");

        AsyncTaskChain chain = new AsyncTaskChain();
        chain
                .doNext(new AsyncTaskNode("LoginTask", businessQueue) 
                    @Override
                    void doLogic(Object o) 
                        System.out.println("goLogic:doLogin current thread:" + Thread.currentThread().getName());
                        NetMission.doLogin("login params", new BaseCallback(chain));
                    
                )
                .doNext(new AsyncTaskNode("BuyTask", businessQueue) 
                    @Override
                    void doLogic(Object o) 
                        System.out.println("goLogic:doBuy current thread:" + Thread.currentThread().getName());
                        NetMission.doBuy((String) o, new BaseCallback(chain));
                    
                )
                .doNext(new AsyncTaskNode("QueryResultTask", businessQueue) 
                    @Override
                    void doLogic(Object o) 
                        System.out.println("goLogic:doQueryResult current thread:" + Thread.currentThread().getName());
                        NetMission.queryResult((String) o, new BaseCallback(chain));
                    
                )
                .doNext(new AsyncTaskNode("CallbackTask", callBackQueue) 
                    @Override
                    void doLogic(Object o) 
                        System.out.println("goLogic:Callback current thread:" + Thread.currentThread().getName());
                        System.out.println("callBack result:" + o);
                    
                )
                .startTask();
    

打印结果

goLogic:doLogin current thread:BusinessQueue
doLogin: params = [login params]
goNext current:BuyTaskcurrent thread:Thread-2
goLogic:doBuy current thread:BusinessQueue
doBuy params = [login result]
goNext current:QueryResultTaskcurrent thread:Thread-3
goLogic:doQueryResult current thread:BusinessQueue
doQueryResult params = [dobuy result]
goNext current:CallbackTaskcurrent thread:Thread-4
goLogic:Callback current thread:CallbackQueue
callBack result:queryResult result

可以看到顺序调用了异步任务,并且上一个任务的结果作为下一个任务的参数,把任务链串联起来。这里我们需要关注一个问题,那就是线程调度。可以看到dologic()方法是在我们创建AsyncTaskNode时指定的线程调度器中执行。但是,dologic()中可能调用到其他组件,例如网路API,数据库API。那么代码就跳入了其他组件的工作线程中执行(例如log中的Thread-2/Thread-3),那么调用其他组件时传入的callback的onResult()方法自然也在其他组件的工作线程执行。我们需要保证下一个任务的doLogic()方法跳回指定的线程调度器中执行,如何做到这一点呢?节点的action()方法中将自己抛给线程调度器,线程调度器会在轮询到任务时,调用其doLogic()方法。保证了线程的可靠性。

结语

最终leader并没有采用我的方案,原因如下

  • 1、没必要自己维护链表,采用Java提供的队列即可实现顺序功能

  • 2、没必要自己维护线程队列,采用Java单线程池即可

  • 3、最严重的问题,也是我没有考虑到的问题:访问链表goNext()是不同线程访问的。这样有可能导致线程安全问题,但实际上不会,因为虽然是不同线程访问,但是是顺序访问。

最后,我也对自己的设计进行了反思,会针对leader提出的问题重写设计,尽量使用Java jdk中组件。希望能对大家在解决异步任务串行问题时有所帮助。

转载请注明出处

https://blog.csdn.net/u012545728/article/details/88708344

以上是关于异步任务串行解决方案的主要内容,如果未能解决你的问题,请参考以下文章

异步任务串行解决方案

同步 ,异步,并发/并行,串行

iOS多线程——同步异步串行并行

ios多线程同步异步、串行并行队列、死锁

帮你快速理解同步 ,异步,并发/并行,串行

iOS面试系列-2多线程中同步异步和串行并行之间的逻辑关系(必考,必须掌握)