异步编程的正确姿势
Posted 开源拾椹
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步编程的正确姿势相关的知识,希望对你有一定的参考价值。
“ 引言部分。java擅长于并发处理,异步编程处处可见,比如平时使用的Stream API,使用的tomcat中间件,网络爬虫,spark/flink计算框架等,都用到了并行计算(异步),在目前来说,多线程是唯一提高Java并行处理速度的方式(当然更好的方式是使用协程,但是java没有协程这个特性),如何编写一个正确的异步计算程序相当重要,那本文帮助大家编写一个正确的异步并行计算程序。”
01
—
准备知识 - 异步计算
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
02
—
异步计算方式 - 线程池
诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方式可能是通过网络协议(例如 HTTP、FTP 或 POP)、通过 JMS 队列或者可能通过轮询数据库。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。
构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。实际上,对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。每个请求对应一个线程(thread-per-request)方法的不足之一是:为每个请求创建一个新线程的开销很大;为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。
除了创建和销毁线程的开销之外,活动的线程也消耗系统资源。在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目。
线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。
03
—
工作队列
就线程池的实际实现方式而言,术语“线程池”有些使人误解,因为线程池“明显的”实现在大多数情形下并不一定产生我们希望的结果。术语“线程池”先于 Java 平台出现,因此它可能是较少面向对象方法的产物。然而,该术语仍继续广泛应用着。虽然我们可以轻易地实现一个线程池类,其中客户机类等待一个可用线程、将任务传递给该线程以便执行、然后在任务完成时将线程归还给池,但这种方法却存在几个潜在的负面影响。例如在池为空时,会发生什么呢?试图向池线程传递任务的调用者都会发现池为空,在调用者等待一个可用的池线程时,它的线程将阻塞。我们之所以要使用后台线程的原因之一常常是为了防止正在提交的线程被阻塞。完全堵住调用者,如在线程池的“明显的”实现的情况,可以杜绝我们试图解决的问题的发生。我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。尽管 Thread API 没有对使用 Runnable 接口强加特殊要求,但使用 Runnable 对象队列的这种模式是调度程序和工作队列的公共约定。
04
—
编写一个正确的线程池
在JDK中提供了Executors工具类来快速创建线程池,不推荐使用Exectors来创建线程池。因为Executors创建线程池在诸多方面存在内存溢出的风险,推荐使用两种方式来实现线程池。
1 使用JDK原生的线程池。
2 使用guava提供的线程池。
提示
JDK提供的Executors创建线程池时使用的任务缓存队列是LinkedBlockQueue,不会指定队列空间大小,默认的队列容量是Integer.MAX_VALUE;这是一个极大的数字,可能会导致缓存的任务过多导致内存溢出。
正确代码示例
/**
* 任务缓存队列
* {@link LinkedBlockingQueue#LinkedBlockingQueue(int)}
*/
private BlockingQueue<Runnable> taskCacheQueue = new LinkedBlockingQueue<>(20);
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 核心线程数量
10,
// 最大线程数量
15,
// 闲置线程的最大存活时间
100, TimeUnit.SECONDS,
// 任务缓存队列
taskCacheQueue,
// 线程创建工厂
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("es-data-%d").build(),
(runnable, executor) -> {
// 如果任务提交失败,阻塞当前主线程
executor.submit(runnable);
}
);
corePoolSize第一个参数是指核心的线程数量。
maxPoolSize第二个参数是指最大可创建线程的数量,如果任务过多,线程池的核心线程没有闲置线程,此时会创建线程,创建线程的数量不会大于maxPoolSize。
keepAliveTime第三个参数是指闲置线程的最大存活时间,当线程池中的非核心线程执行完任务之后并且没有其他任务,此时会销毁多余的线程。
taskCacheQueue第四个参数是指任务队列,主要是用于存储的是主线程提交的Task,用来缓存任务,因为线程池在某一个时间点可能不会立马执行任务,因为没有闲置的线程,所以需要暂时缓存。
threadFactory第五个参数是指线程创建工厂,是用来生产线程对象的。可以设置线程的命名。%d是指按照数字的增长来进行命名,比如es-data-1,es-data-2等,以此类推。也可以指定线程的优先级,是否是精灵线程,处理创建线程异常函数等。
rejectedExecutionHandler第五个参数是指任务拒绝策略,在线程池没有闲置线程,任务队列缓存满了之后会出现任务拒绝的现象,JDK默认提供了
四种拒绝策略,分别是CallerRunsPolicy使用主线程来执行提交的任务,AbortPolicy直接抛出异常,DiscardPolicy直接忽略所提交的任务,DiscardOldestPolicy丢弃任务队列中的老任务来执行新任务。当然也可以自定义操作,此处笔者定义了拒绝策略是写了一个死循环,无限提交当前被拒绝的任务,这当然会阻塞主线程,直到任务提交成功即可。
提交线程池任务代码示例
// 创建任务类
class Task implements Callable<Void> {
public Void call() throws Exception {
Thread.sleep(10000);
System.err.println("任务执行完成");
return null;
}
}
/**
* 任务提交
* @see com.google.common.util.concurrent.Futures
* @see ThreadPoolExecutor#submit(java.util.concurrent.Callable)
*/
public void testSubmitTask() {
Future<Void> future = this.threadPoolExecutor.submit(new Task());
}
需要执行的异步任务定义为Callable的实现类,返回类型是Callable的泛型。我们此处可以自定义封装任务的业务代码到实现方法call中。提交完任务之后会返回一个远期结果Future。Future的含义在于这个异步执行的结果将来可以拿到,但是目前不行,我们通常需要获取当前任务执行的结果,来做下一步的处理,此时我们可以通过Future#get()方法实现获取远期结果。
提示
如果需要传入参数到Callable中直接传递是不可以的,可以通过构造函数的方式来解决这个问题
05
—
阻塞Future的诟病
Future#get()方法是阻塞的,也就意味着我们只要调用future的get方法便会阻塞主线程,这也就违背了我们异步编程的初衷。我们既不想阻塞主线程,也想拿到远期结果进行处理。netty的future解决了这个问题,当然netty不适用于此处,笔者将这种Future称为异步Future。在guava中也提供了很优雅的方式来解决这个问题,如果需要使用guava来做线程池调度,我们需要给JDK远程的线程池做一层包装。
包装代码
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
包装完成之后会返回一个guava自己的定义的线程池对象。此时我们使用guava提供的Futures工具类类做异步future。
任务提交
// 任务提交
ListenableFuture<Void> listenableFuture = listeningExecutorService.submit(new Task());
处理远期结果(在guava中处理远期结果的方案是添加回调函数,Futures监听线程的处理结果,线程处理完成之后产生的结果当做一个参数传入guava的回调函数中)
Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
public void onSuccess( Void result) {
System.err.println(String.format("任务执行结果 :{%s} ", result));
}
public void onFailure(Throwable t) {
}
}, this.threadPoolExecutor);
06
—
Java8解决阻塞Future的方案
Java 8 中, 新增加了一个包含 50 个方法左右的类--CompletableFuture,它提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。对于阻塞或者轮询方式,依然可以通过 CompletableFuture 类的 CompletionStage 和 Future 接口方式支持。CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台,所以我们可以通过实现多个 CompletionStage 命令,并且将这些命令串联在一起的方式实现多个命令之间的触发。我们可以通过 CompletableFuture.supplyAsync(this::sendMsg); 这么一行代码创建一个简单的异步计算。在这行代码中,supplyAsync 支持异步地执行我们指定的方法,这个例子中的异步执行方法是 sendMsg。当然,我们也可以使用 Executor 执行异步程序,默认是 ForkJoinPool.commonPool()。
我们也可以在异步计算结束之后指定回调函数,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);这行代码中的 thenAccept 被用于增加回调函数,在我们的示例中 notify 就成了异步计算的消费者,它会处理计算结果。
/**
* @see CompletableFuture#acceptEitherAsync(CompletionStage, Consumer)
* @see CompletionStage#acceptEitherAsync(CompletionStage, Consumer)
* @see CompletableFuture#supplyAsync(Supplier, Executor) supplier方式
* @see CompletableFuture#runAsync(Runnable, Executor) consumer方式
*/
public void testCompleteFutureBaseApi() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return String.valueOf(new Task().call());
} catch (Exception e) {
e.printStackTrace();
return "执行任务发生异常";
}
}, this.threadPoolExecutor)
.whenComplete((result, throwable) -> {
System.err.println(String.format("执行任务过程中是否发生异常 {%s} ", throwable == null));
System.err.println(String.format("异步计算结果 %s", result));
});
}
CompletableFuture 是一个功能强大的异步编程辅助类,相比如guava而言,两者各有长处,guava的异步API相比较简单易懂,CompletableFuture的API相对有点不易于理解,但是偏向于Fluent风格,编写舒适,笔者不作推荐,看个人习惯。
https://github.com/ubuntu-m/opensource/blob/master/jopen-concurrent/src/test/java/io/jopen/util/concurrent/ThreadPoolExecutorTest.java
以上是关于异步编程的正确姿势的主要内容,如果未能解决你的问题,请参考以下文章
论异步编程的正确姿势:十个接口的活现在只需要一个接口就能搞定!