京东一面:20种异步,你知道几种? 含协程
Posted 40岁资深老架构师尼恩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了京东一面:20种异步,你知道几种? 含协程相关的知识,希望对你有一定的参考价值。
背景说明:
异步,作为性能调优核心方式之一,经常被用于各种高并发场景。
很多场景多会使用到异步,比如:
场景1: 超高并发 批量 写 mysql 、批量写 elasticSearch
场景2: 超高并发 批量 IO
场景3: 超高并发 发送短信、发邮件
场景4: 超高并发 发送消息
场景5: 超高吞吐 生产者、 消费者 场景
场景6: 超高吞吐 发布、 订阅 场景
场景7: 分布式的 通知场景
场景8:异步回调场景
场景9:其他的 异步场景, 不知道能列举出多少,总之非常多
总之,异步,作为性能调优核心方式之一,经常被用于各种高并发场景。
所以,异步是一个非常、非常核心的面试知识点。
在40岁老架构师 尼恩的社群中,其相关面试题是一个非常、非常高频的交流话题。
其大概的出题形式有:
形式1:异步化作为应用调优的一个常用方式,你知道具体有几种方式实现吗?
形式2:异步的实现方式有几种?
形式3:异步调用,你知道哪些?
形式4:异步调用,你用过哪些?
形式5: 实现异步的10多种方式,你用过哪些?
形式6: 实现异步的10多种方式,你知道几个? (出自社群 美团一面)
形式7: 请参考 《尼恩Java面试宝典》,还有很多变种…。
这里尼恩给大家做一下系统化、体系化的梳理,
使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”。
并把这个 题目以及参考答案,收入咱们的 《尼恩Java面试宝典》,
供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。
注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从这里获取:码云
首先、什么是异步?
同步:
调用方在调用过程中,持续阻塞,一直到返回结果。
同步获取结果的方式是: 主动等待。
异步:
调用方在调用过程中,不会阻塞, 不直接等待返回结果, 而是执行其他任务。
异步获取结果的方式是 : 被动通知或者 被动回调。
然后,梳理一下异步的20种实现方式
- 新建线程Thread 实现异步
- 线程池化 实现异步
- Future 阻塞式异步
- guava 回调式异步
- Netty 回调式异步
- Servlet 3.0 异步
- CompletableFuture 回调式异步
- JDK 9 Flow 响应式编程
- RxJava 响应式 异步
- Reactor 响应式 异步
- Spring注解@Async 异步
- EventBus 框架 发布订阅模式异步
- Spring ApplicationEvent 事件 发布订阅模式
- RocketMq 消息队列 分布式 发布订阅模式(Pub/Sub) 异步
- Redis 消息队列 分布式 发布订阅模式(Pub/Sub) 异步
- Distruptor 框架异步
- ForkJoin 框架异步
- RocketMQ源码中ServiceThread 能急能缓的高性能异步
- Kotlin 协程 异步
- Project Loom 协程 异步
方式1:新建线程Thread异步
在《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节中,介绍了一个经典的 异步案例《泡茶的案例》:
分别设计三条线程:泡茶线程(MainThread,主线程)、烧水线程(HotWarterThread)、清洗线程(WashThread)。
-
泡茶线程的工作是:启动清洗线程、启动烧水线程,等清洗、烧水的工作完成后,泡茶喝;
-
清洗线程的工作是:洗茶壶、洗茶杯;
-
烧水线程的工作是:洗好水壶,灌上凉水,放在火上,一直等水烧开。
其中,负责烧水的线程HotWarterThread、负责WashThread 的线程,都是通过线程Thread异步 的方式,完成异步执行的。
具体的内容和示意图,来自于 《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节。 示意图如下:
线程Thread异步 的知识非常多,是Java 高并发的基础知识,
具体请参见 《Java 高并发核心 编程 卷2 加强版》1.3节,示意图如下:
《Java 高并发核心 编程 卷2 加强版》 电子版PDF是免费的,版本不断升级,提取最新版本可以找尼恩即可。
方式2:线程池化 异步
Thread
线程和OS内核线程,是一一对应的关系,频繁的创建、销毁,浪费系统资源,并且涉及到进行内核态和用户态的切换,这一切的一切,都是低性能的。
如何提升性能呢?可以将 线程池化 ,就是线程池。
我们可以采用线程池, 下面的代码,是来自于经历过 双十一 100Wqps 超高并发考验的 JD Hotkey框架的源码:
public class AsyncPool
private static ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
public static void asyncDo(Runnable runnable)
threadPoolExecutor.submit(runnable);
public static void shutDown()
threadPoolExecutor.shutdown();
然后,可以将业务逻辑封装到Runnable
或Callable
中,交由线程池来执行。
下面的业务代码, 是尼恩的第26章视频中基于 JD hotkey 源码二次定制的、 用于 实现三级缓存的 数据一致性的 异步刷新代码:
package com.jd.platform.hotkey.worker.netty.pusher;
....省略 import
/**
* 推送到各客户端服务器
*
* @author wuweifeng wrote on 2020-02-24
* @version 1.0
*/
@Component
public class RocketMqPusher implements IPusher
/**
* 热key集中营
*/
private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();
private static Logger logger = LoggerFactory.getLogger(RocketMqPusher.class);
@Resource
MqSender mqSender;
public static class MqSender
public DefaultMQProducer mqProducer;
public String topic;
public MqSender(DefaultMQProducer mqProducer, String topic)
this.mqProducer = mqProducer;
this.topic = topic;
public void sendToMq(Command cmd)
Message msg = new Message(topic, "", "", cmd.json().getBytes());
try
mqProducer.send(msg, 100000);
catch (Exception e)
logger.error("Failed to publish to RocketMQ", cmd.json(), e);
/**
* 给客户端推key信息
*/
@Override
public void push(HotKeyModel model)
hotKeyStoreQueue.offer(model);
@Override
public void remove(HotKeyModel model)
// push(model);
/**
* 和dashboard那边的推送主要区别在于,给app推送每10ms一次,dashboard那边1s一次
*/
@PostConstruct
public void batchPushToClient()
AsyncPool.asyncDo(() ->
while (true)
try
List<HotKeyModel> tempModels = new ArrayList<>();
//每10ms推送一次
Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
if (CollectionUtil.isEmpty(tempModels))
continue;
String[] keys = tempModels.stream().map(m -> m.getKey()).toArray(String[]::new);
mqSender.sendToMq(new Command(Command.OPT_HOT_KEY, null, keys));
catch (Exception e)
e.printStackTrace();
);
线程Thread异步 的知识非常多,是Java 高并发的基础知识,
具体请参见 《Java 高并发核心 编程 卷2 加强版》1.6节,示意图如下:
《Java 高并发核心 编程 卷2 加强版》 电子版PDF是免费的,版本不断升级,提取最新版本可以找尼恩即可。
方式3:Future 阻塞式异步
为了获取异步线程的返回结果,以及更好的对异步线程的干预,Java在1.5版本之后提供了一种新的多线程的创建方式—— FutureTask方式。
FutureTask方式包含了一系列的Java相关的类,处于java.util.concurrent包中。
使用FutureTask方式进行异步调用时,所涉及的重要组件为FutureTask类和Callable接口。
Future 的调用方式,属于阻塞式异步
主要原因在于,在获取异步线程处理结果时,需要主线程主动通过Future.get() 去获取,
如果异步线程没有执行完,那么Future.get() 会阻塞 调用线程,一直到超时。
参考的代码如下:
具体的内容和示意图,来自于 《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节。 示意图如下:
阻塞式异步Future的不足之处
Future的不足之处的包括以下几点:
-
无法被动接收异步任务的计算结果:
虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
-
Future件彼此孤立:
有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。
-
Futrue没有很好的错误处理机制:
截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。
伪异步 与 纯异步
异步调用目的在于防止当前业务线程被阻塞。
但是 Future 阻塞式异步 属于 伪异步
伪异步 就是 将任务包装为Runnable/ Callable 作为Biz业务线程(被调用线程)的任务去执行,并调用方阻塞等待,当前Biz 线程不阻塞;
纯异步为回调式 异步。他们的区别不在于是否将请求放入另一个线程池执行,而在于是否有线程阻塞等待Response。
在前面的异步阻塞版本的泡茶喝的实现中:
-
泡茶线程是调用线程,
-
烧水(或者清洗)线程是被调用线程,
-
调用线程和被调用线程之间是一种主动关系,而不是被动关系。
-
泡茶线程需要主动获取烧水(或者清洗)线程的执行结果,二者协同的方式是调用方阻塞。
为什么说二者协同的方式是调用方阻塞?调用方线程需要通过join()或Future.get()阻塞式的干预 异步操作或者获取 异步结果,这里,是阻塞模式的异步, 伪异步
这种调用方线程的阻塞,是 线程资源的一种浪费。
线程资源,是宝贵的。怎么充分的利用线程资源呢?
有效方式之一: 回调模式的 异步。实现 纯纯的异步
方式4:guava 回调式异步
由于JDK在1.8之前没有 回调式异步组件,于是出现了很多 开源的 回调式异步组件。
比较常用的是 guava 的回调式异步。
Guava是Google提供的Java扩展包,它提供了一种异步回调的解决方案。
Guava中与异步回调相关的源码处于com.google.common.util.concurrent包中。
包中的很多类都用于对java.util.concurrent的能力扩展和能力增强。
比如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了异步回调的的能力。
使用 Guava 组件实现 异步回调模式 的泡茶喝 实例, 具体的原理和代码请参见《Java 高并发核心 编程 卷2 加强版》9.5.4 章, 示意图如下:
出于 篇幅原因,这里不做赘述, 请大家看PDF电子书 (可找尼恩面试获取最新版), 书里非常细致。
方式5:Netty 回调式异步
由于JDK在1.8之前没有 回调式异步组件,于是出现了很多 开源的 回调式异步组件。
Netty 也算其中之一。
Netty 是 一个 著名的高性能NIO王者框架, 是 IO 的王者组件。 具体,请参见尼恩梳理的四大王者组件。
Netty 除了作为NIO框架之王,其子模也是可以单独使用的,比如说异步回调模块。
Netty 的 回调式异步组件 更加牛掰,为啥呢?
通过Netty源码可以知道: Netty 的 回调式异步组件不光提供了外部的回调监听设置,而且可以在异步代码中, 通过Promise接口,可以对回调结果进行干预,比如说在进行回调之前,执行一些其他的操作。
当然,Netty的源码更加复杂,这部分内容, 请参考尼恩的《第21章视频,彻底穿透Netty源码视频》。
使用 Netty 的 回调式异步组件 实现 异步回调模式 的泡茶喝 实例, 具体的原理和代码请参见《Java 高并发核心 编程 卷2 加强版》9.5.4 章, 示意图如下:
出于 篇幅原因,这里不做赘述, 请大家看PDF电子书 (可找尼恩面试获取最新版), 书里非常细致。
Callback Hell(回调地狱)问题
无论是 Google Guava 包中的 ListenableFuture,还是 Netty的 GenericFutureListener,都是需要设置专门的Callback 回调钩子
Guava 包中的 ListenableFuture,设置Callback 回调钩子的实例如下:
ListenableFuture<Boolean> wFuture = gPool.submit(wJob);
Futures.addCallback(wFuture, new FutureCallback<Boolean>()
public void onSuccess(Boolean r)
if (!r)
Print.tcfo("杯子洗不了,没有茶喝了");
else
countDownLatch.countDown();
public void onFailure(Throwable t)
Print.tcfo("杯子洗不了,没有茶喝了");
);
调用方通过 Futures.addCallback() 添加处理结果的回调函数。
这样避免获取并处理异步任务执行结果阻塞调起线程的问题。
Callback 是将任务执行结果作为接口的入参,在任务完成时回调 Callback 接口,执行后续任务,从而解决纯 Future 方案无法方便获得任务执行结果的问题。
但 Callback 产生了新的问题,那就是代码可读性的问题。
因为使用 Callback 之后,代码的字面形式和其所表达的业务含义不匹配,即业务的先后关系到了代码层面变成了包含和被包含的关系。
因此,如果大量使用 Callback 机制,将使大量的存在先后次序的业务逻辑,在代码形式上,转换成层层嵌套,
从而导致:业务先后次序在代码维度被打乱,最终造成代码不可理解、可读性差、难以理解、难以维护。
这便是所谓的 Callback Hell(回调地狱)问题。
Callback Hell 问题可以从两个方向进行一定的解决:
-
一是链式调用
-
二是事件驱动机制。
前被 CompletableFuture、反应式编程等技术采用,前者被如 EventBus、Vert.x 所使用。
方式6:Servlet 3.0 异步
梳理一下: Callback 异步回调的使用场景。
Callback 真正体现价值,是它与 NIO 技术结合之后。
- CPU 密集型场景,采用 Callback 回调没有太多意义;
- IO 密集型场景,如果是使用 BIO模式,Callback 同样没有意义,因为一个连接一个线程,IO线程是因为 IO 而阻塞。
- IO 密集型场景,如果是使用 NIO 模式,使用Callback 才有意义。 NIO是少量IO线程负责大量IO通道,IO线程需要避免线程阻塞,所以,也必须使用 Callback ,才能使应用得以被开发出来。
所以,高性能的 NIO 框架如 Netty ,都是基于 Callback 异步回调的。
但是,在微服务流行的今天,Netty 却没有在WEB服务器中占据统治地位。
微服务系统中,多级服务调用很常见,一个服务先调 A,再用结果 A 调 B,然后用结果 B 调用 C,等等。
如果使用Netty 作为底层服务器,IO 线程能大大降低,能处理的连接数(/请求数)也能大大增加,那么,为啥Netty 却没有在WEB服务器中占据统治地位呢?
这其中的难度来自两方面:
- 一是 NIO 和 Netty 本身的技术难度,
- 二是 Callback hell:Callback 风格所导致的代码理解和维护的困难。
因此,Netty 通常用于在基础架构层面,在业务系统中应用较少。
这也是大厂小伙人人要求 精通Netty,而中小厂小伙伴,不怎么认识Netty的原因。
当然,作为IO之王,学习Netty对应提升大家的内功,是至关重要的。
Netty的知识,具体请参见《Java高并发核心编程 卷1 加强版》,很多小伙伴,靠此书入的门。
直接使用 Netty 开发WEB应用会遇到技术难度挑战、以及 Callback Hell 问题。
所以,Servlet 3.0 提供了一个异步解决方案。
什么是servlet异步请求
Servlet 3.0 之前,一个普通 Servlet 的主要工作流程大致如下:
(1)Servlet 接收到请求之后,可能需要对请求携带的数据进行一些预处理;
(2)调用业务接口的某些方法,以完成业务处理;
(3)根据处理的结果提交响应,Servlet 线程结束。
其中第二步处理业务逻辑时候很可以碰到比较耗时的任务,此时servlet主线程会阻塞等待完成业务处理,
对于并发比较大的请求可能会产生性能瓶颈,则servlet3.0之后再此处做了调整,引入了异步的概念。
(1)Servlet 接收到请求之后,可能需要对请求携带的数据进行一些预处理;
(2)调用业务接口的某些方法过程中request.startAsync()请求,获取一个AsyncContext
(3)紧接着servlet线程退出(回收到线程池),但是响应response对象仍旧保持打开状态,新增线程会使用AsyncContext处理并响应结果。
(4)AsyncContext处理完成触发某些监听通知结果
@WebServlet(urlPatterns = "/demo", asyncSupported = true)
public class AsyncDemoServlet extends HttpServlet
@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException
// Do Something
AsyncContext ctx = req.startAsync();
startAsyncTask(ctx);
private void startAsyncTask(AsyncContext ctx)
requestRpcService(result ->
try
PrintWriter out = ctx.getResponse().getWriter();
out.println(result);
out.flush();
ctx.complete();
catch (Exception e)
e.printStackTrace();
);
Servlet 3.0 的出现,解决了在过去基于 Servlet 的 Web 应用中,接受请求和返回响应必须在同一个线程的问题,实现了如下目标:
- 可以避免了 Web 容器的线程被阻塞挂起
- 使请求接收之后的任务处理可由专门线程完成
- 不同任务可以实现线程池隔离
- 结合 NIO 技术实现更高效的 Web 服务
除了直接使用 Servlet 3.0,也可以选择 Spring MVC 的 Deferred Result。
示例:Spring MVC DeferredResult
@GetMapping("/async-deferredresult")
public DeferredResult<ResponseEntity<?>> handleReqDefResult(Model model)
LOG.info("Received async-deferredresult request");
DeferredResult<ResponseEntity<?>> output = new DeferredResult<>();
ForkJoinPool.commonPool().submit(() ->
LOG.info("Processing in separate thread");
try
Thread.sleep(6000);
catch (InterruptedException e)
output.setResult(ResponseEntity.ok("ok"));
);
LOG.info("servlet thread freed");
return output;
Servlet 3.0 的技术局限
Servlet 3.0 并不是用来解决前面提到的 Callback Hell 问题的,它只是降低了异步 Web 编程的技术门槛。
对于 Callback Hell 问题,使用 Servlet 3.0 或类似技术时同样会遇到。
解决 Callback Hell 还需另寻他法。
方式7:回调式 异步CompletableFuture
JDK 1.8之前并没有实现回调式的异步,CompletableFuture是JDK 1.8引入的实现类,实现了JDK内置的异步回调模式异步。
CompletableFuture的创新是:通过 链式调用,解决 Callback Hell(回调地狱)问题, 让代码变得的可理解行更强,可读性 更强。
CompletableFuture 该类实现了Future和CompletionStage两个接口。
该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。
使用CompletableFuture实现泡茶喝实例的实现,参考如下:
package com.crazymakercircle.completableFutureDemo;
import com.crazymakercircle.util.Print;
import java.util.concurrent.CompletableFuture;
import static com.crazymakercircle.util.ThreadUtil.sleepSeconds;
public class DrinkTea
private static final int SLEEP_GAP = 3;//等待3秒
public static void main(String[] args)
// 任务 1
CompletableFuture<Boolean> washJob =
CompletableFuture.supplyAsync(() ->
Print.tcfo("洗茶杯");
//线程睡眠一段时间,代表清洗中
sleepSeconds(SLEEP_GAP);
Print.tcfo("洗完了");
return true;
);
// 任务 2
CompletableFuture<Boolean> hotJob =
CompletableFuture.supplyAsync(() ->
Print.tcfo("洗好水壶");
Print.tcfo("烧开水");
//线程睡眠一段时间,代表烧水中
sleepSeconds(SLEEP_GAP);
Print.tcfo("水开了");
return true;
);
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> drinkJob =
washJob.thenCombine(hotJob, (hotOk, washOK) ->
if (hotOk && washOK)
Print.tcfo("泡茶喝,茶喝完");
return "茶喝完了";
return "没有喝到茶";
);
// 等待任务 3 执行结果
Print.tco(drinkJob.join());
具体的内容和示意图,来自于 《Java 高并发核心 编程 卷2 加强版》10章。 示意图如下:
出于 篇幅原因,这里不做赘述, 请大家看PDF电子书 (可找尼恩面试获取最新版), 书里非常细致。
方式8:JDK 9 Flow 响应式编程
但是 JDK 8 的 CompletableFuture 属于链式调用,它在形式上带有一些响应式编程的函数式代码风格。
因为 Callback Hell 对代码可读性有很大杀伤力,从开发人员的角度来讲,反应式编程技术和链式调用一样,使得代码可读性要比 Callback 提升了许多。
响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。
Reactive Stream是一套基于发布/订阅模式的数据处理规范。
更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。
**响应式流(Reactive Streams)**是一个响应式编程的规范,用来为具有非阻塞背压(Back pressure)的异步流处理提供标准,用最小的一组接口、方法和协议,用来描述必要的操作和实体。这里涉及到一个关键概念叫 Backpressure,国内大部分翻译为背压,我们先来了解这是什么。
响应式编程,其实就是对数据流的编程,而对流的处理对数据流的变化进行响应,是通过异步监听的方式来处理的。既然是异步监听,就涉及到监听事件的发布者和订阅者,数据流其实就是由发布者生产,再由一个或多个订阅者进行消费的元素(item)序列。
那么,如果发布者生产元素的速度和订阅者消费元素的速度不一样,是否会出现问题呢?其实就两种情况:
- 发布者生产的速度比订阅者消费的速度慢,那生产的元素可以及时被处理,订阅者处理完只要等待发布者发送下一元素即可,这不会产生什么问题。
- 发布者生产的速度比订阅者消费的速度快,那生产的元素无法被订阅者及时处理,就会产生堆积,如果堆积的元素多了,订阅者就会承受巨大的资源压力(pressure)而有可能崩溃。
要应对第二种情况,就需要进行流控制(flow control)。
流控制有多种方案,其中一种机制就是 Back pressure,即背压机制,其实就是下游能够向上游反馈流量需求的机制。
如果生产者发出的数据比消费者能够处理的数据量大而且快时,消费者可能会被迫一直再获取或处理数据,消耗越来越多的资源,从而埋下潜在的风险。为了防止这一点,消费者可以通知生产者降低生产数据的速度。生产者可以通过多种方式来实现这一要求,这时候我们就会用到背压机制。
采用背压机制后,消费者告诉生产者降低生产数据速度并保存元素,知道消费者能够处理更多的元素。使用背压可以有效地避免过快的生产者压制消费者。如果生产者要一直生产和保存元素,使用背压也可能会要求其拥有无限制的缓冲区。生产者也可以实现有界缓冲区来保存有限数量的元素,如果缓冲区已满可以选择放弃。
背压的实现方式
背压的实现方式有两种:
- 一种是阻塞式背压
- 另一种是非阻塞式背压。
1、阻塞式背压
阻塞式背压是比较容易实现的,例如:当生产者和消费者在同一个线程中运行时,其中任何一方都将阻塞其他线程的执行。这就意味着,当消费者被执行时,生产者就不能发出任何新的数据元素。因而也需要一中自然地方式来平衡生产数据和消费数据的过程。
在有些情况下,阻塞式背压会出现不良的问题,比如:当生产者有多个消费者时,不是所有消费者都能以同样的速度消费消息。当消费者和生产者在不同环境中运行时,这就达不到降压的目的了。
2、非阻塞式背压
背压机制应该以非阻塞式的方式工作,实现非阻塞式背压的方法是放弃推策略,采用拉策略。生产者发送消息给消费者等操作都可以保存在拉策略当中,消费者会要求生产者生成多少消息量,而且最多只能发送这些量,然后等到更多消息的请求。
OK,Backpressure 解释完毕。 其实,《Java 高并发核心编程卷3 加强版》介绍得更加清楚,大家可以系统化的去看看书。
JDK 8 的 CompletableFuture 不算是反应式编程,不遵守Reactive Stream (响应式流/反应流) 规范。
JDK 9 Flow 是JDK对Reactive Stream (响应式流/反应流) 的实现,
Java 9中新增了反应式/响应式编程的Api-Flow
。
在Flow
中存在Publisher(发布者)
、Subscriber(订阅者)
、Subscription(订阅)
和`Processor(处理器)。
Flow`结构如下:
JDK 9 Flow 旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者需要有无限制的缓冲区或丢弃。
当然,实施响应式编程,需要完整的解决方案,单靠 Flow 是不够的,还是需要Netflix RxJava、 Project Reactor 这样的完整解决方案。
但是, JDK 层面的技术能提供统一的技术抽象和实现,在统一技术方面还是有积极意义的。
所以,这里不对 JDK 9 Flow 做介绍。
方式9:RxJava 响应式 异步
在JDK 9 Flow 之前,响应式编程 的框架,早就存在。
比如说, 席卷了android 端编程的 RxJava 框架。RxJava 是一种响应式编程,来创建基于事件的异步操作库。
这个组件,是 Netflix的杰作,也叫作Netflix RxJava。
这个框架,在Java 后端的中间件中,也有广泛使用,比如在Hystrix 源码中,就用大量用到。
总之,RxJava 框架很重要,具体请参见 《Java 高并发核心编程 卷3 加强版》
使用 RxJava 基于事件流的链式调用、代码 逻辑清晰简洁。
package com.crazymakercircle.completableFutureDemo;
import com.crazymakercircle.util.Print;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import static com.crazymakercircle.util.ThreadUtil.sleepMilliSeconds;
import static com.crazymakercircle.util.ThreadUtil.sleepSeconds;
public class IntegrityDemo
/**
* 模拟模拟RPC调用1
*/
public String rpc1()
//睡眠400ms,模拟执行耗时
sleepMilliSeconds(600);
Print.tcfo("模拟RPC调用:服务器 server 1");
return "sth. from server 1";
/**
* 模拟模拟RPC调用2
*/
public String rpc2()
//睡眠400ms,模拟执行耗时
sleepMilliSeconds(600);
Print.tcfo("模拟RPC调用:服务器 server 2");
return "sth. from server 2";
@Test
public void rpcDemo() throws Exception
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->
return rpc1();
);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> rpc2());
CompletableFuture<String> future3 = future1.thenCombine(future2,
(out1, out2) ->
return out1 + " & " + out2;
);
String result = future3.get();
Print.tco("客户端合并最终的结果:" + result);
@Test
public void rxJavaDemo() throws Exception
Observable<String> observable1 = Observable.fromCallable(() ->
return rpc1();
).subscribeOn(Schedulers.newThread());
Observable<String> observable2 = Observable
.fromCallable(() -> rpc2()).subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.observeOn(Schedulers.newThread())
.toList()
.subscribe((result) -> Print.tco("客户端合并最终的结果:" + result));
sleepSeconds(Integer.MAX_VALUE);
如果需要从设计模式的高度理解Rxjava、或者响应式编程,请参考 《Java 高并发核心 编程 卷3 加强版》5章。 示意图如下:
方式10:Reactor 响应式 异步
目前,在 Java 领域实现了反应式编程的技术除了 Netflix RxJava ,还有 Spring 的 Project Reactor。
Netflix 网飞 的RxJava 出现时间更早,在前端开发领域应用的比后端更要广泛一些。
Spring 的 Project Reactor的 3.0 版本作为 Spring 5 的基础,在17年底发布,推动了后端领域反应式编程的发展。
关于Reactor 响应式异步的 内容,请参考尼恩的另外一篇博客,
这篇文章,被小伙伴认为是全网介绍 Reactor 组件讲得最好的文章,
其实尼恩的其他文章,非常优质,大家都可以看看。
由于性能比较高,响应式编程越来越普及, 建议大家及早掌握。
尼恩的50个群中,很多开始响应式编程的小伙伴反馈: 入手难、后面爽。
方式11:Spring的@Async异步
在Spring中,使用@Async标注某方法,可以使该方法变成异步方法,这些方法在被调用的时候,将会在独立的线程中进行执行,调用者不需等待该方法执行完成。
但在Spring中使用@Async注解,需要使用@EnableAsync来开启异步调用。
一个 AsyncService参考代码如下:
public interface AsyncService
MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);
MessageResult sendEmail(String email, String subject, String content);
@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService
@Autowired
private IMessageHandler mesageHandler;
@Override
@Async("taskExecutor")
public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content)
try
Thread.sleep(1000);
mesageHandler.sendSms(callPrefix, mobile, actionType, content);
catch (Exception e)
log.error("发送短信异常 -> ", e)
@Override
@Async("taskExecutor")
public sendEmail(String email, String subject, String content)
try
Thread.sleep(1000);
mesageHandler.sendsendEmail(email, subject, content);
catch (Exception e)
log.error("发送email异常 -> ", e)
@Async注解,默认使用系统自定义线程池。
在实际项目中,推荐等方式是是使用自定义线程池的模式。
可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async(“taskName”)
自定义异步线程池的代码如下
/**
* 线程池参数配置,多个线程池实现线程池隔离
@EnableAsync
@Configuration
public class TaskPoolConfig
/**
* 自定义线程池
*
**/
@Bean("taskExecutor")
public Executor taskExecutor()
//返回可用处理器的Java虚拟机的数量 12
int i = Runtime.getRuntime().availableProcessors();
System.out.println("系统最大线程数 : " + i);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(16);
//最大线程数
executor.setMaxPoolSize(20);
//配置队列容量,默认值为Integer.MAX_VALUE
executor.setQueueCapacity(99999);
//活跃时间
executor.setKeepAliveSeconds(60);
//线程名字前缀
executor.setThreadNamePrefix("asyncServiceExecutor -");
//设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
executor.setAwaitTerminationSeconds(60);
//等待所有的任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
方式12:EventBus 发布订阅模式异步
实际开发中,常常 通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 ,
比如,在顶级开源组件 JD hotkey的源码中, 就多次用到 EventBus/AsyncEventBus进行JAVA模块解耦
掌握了 EventBus ,在平时的开发中,多了一个神器
EventBus 是 Guava 的事件处理机制,是观察者模式(生产/消费模型)的一种实现。
EventBus是google的Guava库中的一个处理组件间通信的事件总线,它基于发布/订阅模式,实现了多组件之间通信的解耦合,事件产生方和事件消费方实现解耦分离,提升了通信的简洁性。
观察者模式在我们日常开发中使用非常广泛,
例如在订单系统中,订单状态或者物流信息的变更会向用户发送APP推送、短信、通知卖家、买家等等;审批系统中,审批单的流程流转会通知发起审批用户、审批的领导等等。
Observer模式也是 JDK 中自带就支持的,其在 1.0 版本就已经存在 Observer,
不过随着 Java 版本的飞速升级,其使用方式一直没有变化,许多程序库提供了更加简单的实现,例如 Guava EventBus、RxJava、EventBus 等
为什么要用 EventBus ,其优点 ?
EventBus 优点
- 相比 Observer 编程简单方便
- 通过自定义参数可实现同步、异步操作以及异常处理
- 单进程使用,无网络影响
缺点
- 只能单进程使用
- 项目异常重启或者退出不保证消息持久化
如果需要分布式使用还是需要使用 MQ
使用事件总线的场景
当一个事件的发生(事件产生方),需要触发很多事件(事件消费方)的时候,我们通常会在事件产生方中,分别的去调用那些事件消费方,这样往往是很浪费资源。事件的产生方与事件的消费方,产生了极大的耦合,如果我们要改动某一个事件消费方,我们很可能还要改动事件的产生方。
在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知); 如果是同一个jvm里面通知的话,就可以使用EventBus 事件总线。
由于EventBus使用起来简单、便捷,因此,工作中会经常用到。
EventBus 是线程安全的,分发事件到监听器,并提供相应的方式让监听器注册它们自己。
EventBus允许组件之间进行 “发布-订阅” 式的通信,而不需要这些组件彼此知道对方。
EventBus是专门设计用来替代传统的Java进程内的使用显示注册方式的事件发布模式。
EventBus不是一个通用的发布-订阅系统,也不是用于进程间通信。
EventBus有三个关键要素:
1、事件(Event)
事件是EventBus之间相互通信的基本单位,一个Event可以是任何类型。
对,没错,就是Object,只要你想将任意一个Bean作为事件,这个类不需要做任何改变,就可以作为事件Event。不过在项目中不会这么随便(除非对代码严谨度没什么要求。。)
,一般会定义特定的事件类,类名以Event作为后缀,里面定义一些变量或者函数等。
2、事件发布者(Publisher)
事件发布者,就是发送事件到EventBus事件总线的一方,事件发布者调用Post()方法,将事件发给EventBus。
你可以在程序的任何地方,调用EventBus的post()方法,发送事件给EventBus,由EventBus发送给订阅者们。
3、事件订阅者(Subscriber)
事件订阅者,就是接收事件的一方,这些订阅者需要在自己的方法上,添加@Subscribe注解声明自己为事件订阅者。不过只声明是不够的,还需要将自己所在的类,注册到EventBus中,EventBus才能扫描到这个订阅者。
关于EventBus 的 原理和实操内容,请参考尼恩的另外一篇博客,
通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 (史上最全)
方法13:Spring ApplicationEvent事件实现异步
Spring内置了简便的事件机制,原理和EventBus 差不多
通过Spring ApplicationEvent事件, 可以非常方便的实现事件驱动,核心类包括
- ApplicationEvent,具体事件内容,事件抽象基类,可继承该类自定义具体事件
- ApplicationEventPublisher,事件发布器,可以发布ApplicationEvent,也可以发布普通的Object对象
- ApplicationListener,事件监听器,可以使用注解
@EventListener
- TransactionalEventListener,事务事件监听,可监听事务提交前、提交后、事务回滚、事务完成(成功或失败)
使用示例:不定义事件,直接发布Object对象,同步
1、定义发送事件对象
public class UserEntity
private long id;
private String name;
private String msg;
2、定义事件监听器
可以添加条件condition,限制监听具体的事件
@Slf4j
@Component
public class RegisterListener
@EventListener(condition = "#entity.id != null and #entity.async==false ")
public void handlerEvent(UserEntity entity)
try
// 休眠5秒
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
e.printStackTrace();
log.info("handlerEvent: ", entity);
3、定义发送接口以及实现类
public interface IRegisterService
public void register(String name);
12345
@Service
public class RegisterServiceImpl implements IRegisterService
@Resource
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void publish(String name)
UserEntity entity = new UserEntity();
entity.setName(name);
entity.setId(1L);
entity.setMsg("新用户注册同步调用");
applicationEventPublisher.publishEvent(entity)以上是关于京东一面:20种异步,你知道几种? 含协程的主要内容,如果未能解决你的问题,请参考以下文章