《高性能利器》--异步调用有哪些实现方式?
Posted 敲代码的程序狗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《高性能利器》--异步调用有哪些实现方式?相关的知识,希望对你有一定的参考价值。
什么是异步
同步调用:调用方在调用过程中,持续等待返回结果。
异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。
脱离IO
,单独讨论同步
和异步
,我们更加容易去理解它的原理,同步和异步其实属于一种通信机制
,表达的是,我们在通信过程中,是主动去询问,还是等对方主动反馈。体现在同步(主动)
和异步(被动)
上。
进程内异步调用
1、Thread
进程和线程:进程是资源分配的最小单位,线程是CPU调度的最小单位
Java进程
内最简单的异步调用方式,就是通过 new Thread().start()
的方式,启动新的线程进行任务的执行(CPU调度
)。
public static void main(String[] args) {
System.out.println("煲水");
//创建新的线程
Thread thread1= new Thread(()->{
try {
Thread.sleep(5000);
System.out.println("水开了,"+Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
});
thread1.start();
System.out.println("运动");
}
1.1、start() 和 run()
在上述实例代码中,我们虽然采用了实现 Runnable
接口的方式,进行新线程的实现,但是在方法启动时,并没有使用 run()
方法,而是使用了 start()
方法。
run():使用当前线程执行 run()方法调用,可以理解时同步调用
start()
方法在调用时,在代码逻辑中,会调用到一个本地方法 start0
下载 JDK源码后,可以看到 Thread 类
有个 registerNatives
本地方法,该方法主要的作用就是注册一些本地方法供 Thread 类使用,如 start0(),stop0()
等等,可以说,所有操作本地线程的本地方法都是由它注册的。
可以看出 Java 线程
调用 start->start0
的方法,实际上会调用到 JVM_StartThread
方法,通过调用 new JavaThread(&thread_entry,sz)
完成线程的创建。
在 jvm.cpp
中,有如下代码段:
在创建完线程后,通过 thread_entry
完成 run()
方法的调用
1.2、Future
Future
的调用方式,属于同步非阻塞
, 主要原因在于,在获取异步线程处理结果时,需要主线程主动去获取,异步线程并没有通过主动通知
的方式,将数据结构进行更新
或回调
。
public static void main(String[] args) throws Exception {
System.out.println("煲水");
FutureTask<String> futureTask = new FutureTask(()->{
try {
Thread.sleep(5000);
System.out.println("水开了,"+Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
return "water";
});
//创建新的线程
Thread thread1= new Thread(futureTask);
thread1.start();
System.out.println("运动");
Thread.sleep(3000);
//阻塞等待数据
String result= futureTask.get(5, TimeUnit.SECONDS);
System.out.println("喝水," + result);
}
Future 的实现原理
类的继承关系图如下,可以看到 FutureTask
,实现了 Runnable
接口,那么在重写的run()
方法中,可以看到,在调用 call()
方法获取到结果后,通过CAS
的方式,更新到成员变量
中。
任务调用结果更新:
1.3、ThreadPoolExecutor
public static void main(String[] args) throws Exception {
ExecutorService executors = Executors.newFixedThreadPool(10);
System.out.println("煲水");
Future<String> future = executors.submit(() -> {
try {
Thread.sleep(5000);
System.out.println("水开了," + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return "water";
});
System.out.println("运动");
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("喝水," + result);
}
上面讲解了 FutureTask
的实现原理后,这里在对比 submit()
和 execute()
,就比较容易理解了,在submit()
方法中,将 Callable<T>
实现类,封装成了 FutureTask
, 然后再进行实际的调用:
1.4、总结
核心区别在于 start()
和 run()
, start()是启动一条新的线程
的同时,完成run()
方法,这时候是一个异步操作
;如果直接执行run()
方法, 则会在当前线程
直接执行,是一个同步阻塞操作
。
而 Future
的调用方式,则是一个 同步非阻塞
处理,在提交了任务
后,不阻塞主线程的继续执行,在到了一定时间后,主线程可以通过get()
方法,获取异步任务
处理结果。
ThreadPoolExecutor
则是维护了一个可复用的线程池,解决了资源复用
,性能耗时
的问题, Java线程
默认大小为1MB
,线程的创建
和销毁
都会占用内存和GC耗时;而线程的无限制创建, 则会带来CPU负载过高
,每个线程分配的时间片很少,导致处理效率低。
2、EventBus
public class JiulingTest {
public static void main(String[] args) throws Exception {
System.out.println("开始");
//使用异步事件总线
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(10));
// 向上述EventBus对象中注册一个监听对象
eventBus.register(new EventListener());
// 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
eventBus.post(new Event("煲水"));
System.out.println("运动");
}
}
// 事件,监听者监听的事件的包装对象
class Event {
//事件动作
public String action;
Event(String action) {
this.action = action;
}
}
// 监听者
class EventListener {
// 监听的方法,必须使用注解声明,且只能有一个参数,实际触发一个事件的时候会根据参数类型触发方法
@Subscribe
public void listen(Event event) {
try {
System.out.println("Event listener receive message: " + event.action + " threadName:" + Thread.currentThread().getName());
Thread.sleep(5000);
System.out.println("水开了!");
}catch (Exception e){
e.printStackTrace();
}
}
}
2.1、观察者模式
在 EventBus
中,通过 @Subscribe
定义了抽象观察者的行为, 通过入参
区分不同的事件监听动作
,如上述的示例代码中, listen(Event event)
只会观察这个类的事件。
/**
* Returns all subscribers for the given listener grouped by the type of event they subscribe to.
*/
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
//遍历 @Subscribe 的方法
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
//然后根据 参数类型,也就是事件类型,进行归类
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
然后在进行事件发布的时候,通过调用 EventBus.post()
方法,遍历找到所有的监听方法:
public void post(Object event) {
//从上述归类的Map 中,找到所有的观察者方法
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//事件分发,具体调用
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
2.2、AsyncEventBus
在示例代码中,我们使用的是 new AsyncEventBus(Executors.newFixedThreadPool(10))
构建的异步事件总线。
由下往上倒推,我们先看 Listern
,是如何执行事件处理方法的,这里比较好理解
,通过线程池完成任务的调用,具体实现是 通过反射的方式调用 @Subscribe
注解的方法。
那么这里的 executor
是怎么来的呢?
this.executor = bus.executor(); //从事件总线传递过来
回到 EventBus
中,我们可以看到构造函数并没有提供初始化线程池的入口,那么默认线程池的创建,可以跟踪到
这个线程池的 execute
方法,并没有创建新的线程执行 Runnable 方法
,而是使用当前线程
执行(具体逻辑参考1.1
)。 因此 EventBus
是不支持异步事件处理的!
在 dispatchEvent
方法中,比较直接可以看到整体设计中,是支持异步事件的,我们需要做的就是将 Executor
修改成一个合理的线程池, 而 AsyncEventBus
恰恰提供了这个能力。
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
3、Spring Event
Spring Event
与 Event Bus
默认都是同步执行,支持通过设置 Executors
的方式修改成异步事件。
核心组件:
- 事件类:定义事件,继承
ApplicationEvent
的类成为一个事件类。 - 发布者:发布事件,通过
ApplicationEventPublisher
发布事件。 - 监听者:监听并处理事件,实现
ApplicationListener
接口或者使用@EventListener
注解。
由于代码过多,可以直接github 下载 进行阅读,这里只贴部分关键代码:
在发布事件方法:AbstractApplicationContext#publishEvent
会走到 下图中的 SimpleApplicationEventMulticaster#multicastEvent
执行具体任务的调度。 这里的设计与 上面的 EventBus
如出一辙,在执行时,通过区分线程池进行实际的调度,从而决定 同步|异步
!
3.1、异步之ApplicationEventMulticaster
修改 ApplicationEventMulticaster
设置初始线程池, 和EventBus
的解决思路一致:
@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
return eventMulticaster;
}
在Spring 上下文初始化的时候,会将这一个bean,加载到上下文中,
存在的问题: 由于将整个上下文的 ApplicationEventMulticaster
都替换了,那么在事件处理的流程上,所有的事件都会以异步的方式进行,那么风险的把控就很难做好。不建议,但能用(毕竟经受过考验
)
3.2、异步之@Async
通过实现 AsyncConfigurer
接口,自定义线程池,对切面方法,执行反射代理
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
核心原理
进程间异步调用
Dubbo 异步调用
在rpc
框架中,我们普遍使用的都是同步调用
模式,但是在 Dubbo
的底层实现中,反而是以 异步调用
的方式实现的。先来简单看看调用链路
:
核心代码在
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
消息队列异步解耦
在介绍 EventBus
的时候, 我查看了很多文章,都将EventBus
的设计模式
描述为发布-订阅
模式。首先这个描述是错误的,然后我们来对比一下他们的区别:
从表面上看:
- 观察者模式里,只有两个角色 —— 观察者 + 被观察者
- 而发布订阅模式里,却不仅仅只有发布者和订阅者两个角色,还有一个经常被我们忽略的 —— 经纪人Broker
往更深层次讲:
- 观察者和被观察者,是松耦合的关系
- 发布者和订阅者,则完全不存在耦合
发布-订阅
模式:
消息队列
能够帮我们做到解耦
的效果,通过消息中间件,如 RocketMQ
,kafka
和rabbitMQ
等; 完成消息的接收和推送,从而达到异步处理
的效果。
以上是关于《高性能利器》--异步调用有哪些实现方式?的主要内容,如果未能解决你的问题,请参考以下文章
第十四篇商城系统-异步处理利器-CompletableFuture