异步任务串行解决方案
Posted Androider_Zxg
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步任务串行解决方案相关的知识,希望对你有一定的参考价值。
问题描述
在业务开发时,有时不仅仅是拉取一个数据接口展示列表这么简单。举一个购买场景:
- 第一步调用网络接口登录
- 第二步调用网络接口购买
- 第三步查询购买结果
- 第四步调用DBApi将购买结果写入本地
- 第五步向外回调结果
这里所有的操作都是异步的,再举一个聊天业务场景的例子。当收到有新消息通知。需要拉取获取新消息的网络接口以获得新消息。有这样几步:
- 拉取本地DBApi查询本地保持的最大消息Id,以此作为查询新消息的参数
- 拉取查询新消息的网络接口
- 将查询到的消息数据保存在DB
- 保存后向外回调
这里每一步都是异步的,同时上一步的结果将作为下一步的入参。这就是典型的异步任务串行的场景。其实Rxjava提供了这种场景的解决方案,今天我们试着自己写一套解决该问题的方案。
思考
方案一
这里说到底就是一个任务分发的问题,我们可以建立一个TaskCenter。在每一个异步任务回调中,将异步任务的结果和标记自身的tag。转发给TaskCenter,TaskCenter根据tag和结果来创建下一个异步任务。并执行。这种方案可能是我们最容易想到的方案,也是代码实现上最简单的方案。类似于中介者模式。但是这种方式也有如下缺点
- TaskCenter中承载了所有业务的逻辑,像是一团线中打的结。所有的任务都需要通过它寻找下一个任务节点
- Task类过于多,每一个任务,我们都需要写一个类。来承载他的逻辑。当业务复杂时,需要创建很多Task类
图示如下:
方案二
我们上述提到,每一个任务就是一个节点,一个节点执行完去寻找下一个节点。那么我们可以想到一种常用的数据结构:链表。每一个业务场景都是一个任务链表,一个节点一个节点执行。直到执行到链表的尾端。当我们编写一个业务场景时,首先确立好有哪几个步骤,接着创建这样一个链表,最后开始执行。优点:
- 每个业务场景都是一个任务链,各场景间不会有代码逻辑上耦合
- 代码可读性增强,很清晰的能看到业务场景的每一步都是做什么
图示如下
代码设计
这里我们实现方案二,我们需要解决如下几个问题,代码也就写出来了
- 链表建立及节点插入
- 启动链表执行第一个节点任务
- 任务结果传递给下一个节点并启动节点任务
- 线程调度
这里我们给出设计类图,再依次介绍每个类的作用
AsyncTaskNode
任务节点,其中会维护下一节点的指针,并定义抽象方法doLogic()由具体任务实现
。action方法提供给任务链AsyncTaskChain调用,action方法负责将doLogic()抛给线程调度器处理或直接执行doLogic方法
AsyncTaskChain
任务链,其中维护了链表的头结点、尾节点及当前节点。提供插入节点的doNext()方法及移动当前节点指针的goNext()方法。goNext()方法中会调用节点的action方法并将当前指针向前移动
BaseCallback
实现Callback接口中onResult()方法,其中引用任务链。在onResult()方法中调用任务链的doNext()方法向前移动指针。
SingleThreadExexutor
封装线程及队列,提供schedule()方法向任务队列里面插入任务。作为线程调度器。
代码实现
这里我直接贴出代码实现
AsyncTaskNode
public abstract class AsyncTaskNode implements SingleThreadExecutor.Callable
public String tag;
protected AsyncTaskNode next;
private SingleThreadExecutor executor;
private Object param;
public AsyncTaskNode(String tag)
this.tag = tag;
public AsyncTaskNode(String tag, SingleThreadExecutor executor)
this.tag = tag;
this.executor = executor;
abstract void doLogic(Object o);
protected void action(Object o)
this.param = o;
if (executor != null)
//若线程调度器不为空,将任务交给调度器调度执行
executor.schedule(this);
else
//否则直接执行逻辑
doLogic(o);
public Object getParam()
return param;
@Override
public void call()
doLogic(param);
AsyncTaskChain
public class AsyncTaskChain
public AsyncTaskNode head;
public AsyncTaskNode tail;
public AsyncTaskNode current;
public void startTask()
current = head.next;
if (head != null)
head.action(null);
public AsyncTaskChain doNext(AsyncTaskNode task)
if (head == null)
head = task;
tail = task;
current = head;
else
tail.next = task;
tail = tail.next;
return this;
public void goNext(Object o)
if (current != null)
System.out.println("goNext current:" + current.tag + "current thread:" + Thread.currentThread().getName());
current.action(o);
current = current.next;
SingleThreadExecutor
public class SingleThreadExecutor
private String name;
private Thread thread;
private LinkedBlockingQueue<Callable> blockingQueue;
private AtomicBoolean stopFlag;
public SingleThreadExecutor(String name)
this.name = name;
stopFlag = new AtomicBoolean();
stopFlag.set(false);
blockingQueue = new LinkedBlockingQueue<>();
thread = new Thread(this::loopQueue);
thread.setName(name);
thread.start();
private void loopQueue()
while (blockingQueue != null && !stopFlag.get() && thread != null && (!thread.isInterrupted()))
Callable task = null;
try
task = blockingQueue.take();
task.call();
catch (InterruptedException e)
e.printStackTrace();
public void schedule(Callable callable)
if (blockingQueue != null && (!stopFlag.get()))
blockingQueue.offer(callable);
public void destroy()
stopFlag.set(true);
blockingQueue.clear();
thread.interrupt();
thread = null;
public interface Callable
void call();
Callback
public interface Callback
void onResult(Object o);
BaseCallback
public class BaseCallback implements Callback
private AsyncTaskChain chain;
public BaseCallback(AsyncTaskChain chain)
this.chain = chain;
@Override
public void onResult(Object o)
if (chain != null)
chain.goNext(o);
测试代码
NetMission
该类提供模拟异步的网络方法
public class NetMission
public static void doLogin(String phone, Callback callBack)
new Thread(new Runnable()
@Override
public void run()
System.out.println("doLogin: params = [" + phone + "]");
callBack.onResult("login result");
).start();
public static void doBuy(String str, Callback callBack)
new Thread(new Runnable()
@Override
public void run()
System.out.println("doBuy params = [" + str + "]");
callBack.onResult("dobuy result");
).start();
public static void queryResult(String str, Callback callBack)
new Thread(new Runnable()
@Override
public void run()
System.out.println("doQueryResult params = [" + str + "]");
callBack.onResult("queryResult result");
).start();
测试类
public class Test
public static void main(String[] args)
//不使用线程调度器
AsyncTaskChain taskChain1 = new AsyncTaskChain();
taskChain1
.doNext(new AsyncTaskNode("LoginTask")
@Override
void doLogic(Object o)
NetMission.doLogin("login params", new BaseCallback(taskChain1));
)
.doNext(new AsyncTaskNode("BuyTask")
@Override
void doLogic(Object o)
NetMission.doBuy((String) o, new BaseCallback(taskChain1));
)
.doNext(new AsyncTaskNode("third")
@Override
void doLogic(Object o)
NetMission.queryResult((String) o, new BaseCallback(taskChain1));
).startTask();
//使用线程调度器
SingleThreadExecutor businessQueue = new SingleThreadExecutor("BusinessQueue");
SingleThreadExecutor callBackQueue = new SingleThreadExecutor("CallbackQueue");
AsyncTaskChain chain = new AsyncTaskChain();
chain
.doNext(new AsyncTaskNode("LoginTask", businessQueue)
@Override
void doLogic(Object o)
System.out.println("goLogic:doLogin current thread:" + Thread.currentThread().getName());
NetMission.doLogin("login params", new BaseCallback(chain));
)
.doNext(new AsyncTaskNode("BuyTask", businessQueue)
@Override
void doLogic(Object o)
System.out.println("goLogic:doBuy current thread:" + Thread.currentThread().getName());
NetMission.doBuy((String) o, new BaseCallback(chain));
)
.doNext(new AsyncTaskNode("QueryResultTask", businessQueue)
@Override
void doLogic(Object o)
System.out.println("goLogic:doQueryResult current thread:" + Thread.currentThread().getName());
NetMission.queryResult((String) o, new BaseCallback(chain));
)
.doNext(new AsyncTaskNode("CallbackTask", callBackQueue)
@Override
void doLogic(Object o)
System.out.println("goLogic:Callback current thread:" + Thread.currentThread().getName());
System.out.println("callBack result:" + o);
)
.startTask();
打印结果
goLogic:doLogin current thread:BusinessQueue
doLogin: params = [login params]
goNext current:BuyTaskcurrent thread:Thread-2
goLogic:doBuy current thread:BusinessQueue
doBuy params = [login result]
goNext current:QueryResultTaskcurrent thread:Thread-3
goLogic:doQueryResult current thread:BusinessQueue
doQueryResult params = [dobuy result]
goNext current:CallbackTaskcurrent thread:Thread-4
goLogic:Callback current thread:CallbackQueue
callBack result:queryResult result
可以看到顺序调用了异步任务,并且上一个任务的结果作为下一个任务的参数,把任务链串联起来。这里我们需要关注一个问题,那就是线程调度。可以看到dologic()方法是在我们创建AsyncTaskNode时指定的线程调度器中执行。但是,dologic()中可能调用到其他组件,例如网路API,数据库API。那么代码就跳入了其他组件的工作线程中执行(例如log中的Thread-2/Thread-3),那么调用其他组件时传入的callback的onResult()方法自然也在其他组件的工作线程执行。我们需要保证下一个任务的doLogic()方法跳回指定的线程调度器中执行,如何做到这一点呢?节点的action()方法中将自己抛给线程调度器,线程调度器会在轮询到任务时,调用其doLogic()方法。保证了线程的可靠性。
结语
最终leader并没有采用我的方案,原因如下
-
1、没必要自己维护链表,采用Java提供的队列即可实现顺序功能
-
2、没必要自己维护线程队列,采用Java单线程池即可
-
3、最严重的问题,也是我没有考虑到的问题:访问链表goNext()是不同线程访问的。这样有可能导致线程安全问题,但实际上不会,因为虽然是不同线程访问,但是是顺序访问。
最后,我也对自己的设计进行了反思,会针对leader提出的问题重写设计,尽量使用Java jdk中组件。希望能对大家在解决异步任务串行问题时有所帮助。
转载请注明出处
https://blog.csdn.net/u012545728/article/details/88708344
以上是关于异步任务串行解决方案的主要内容,如果未能解决你的问题,请参考以下文章