多线程 - CallableFuture 和 FutureTask 简单应用
Posted 程序员牧码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程 - CallableFuture 和 FutureTask 简单应用相关的知识,希望对你有一定的参考价值。
我们知道创建线程的方式有两种,一种是实现Runnable接口,另一种是继承Thread,但是这两种方式都有个缺点,那就是在任务执行完成之后无法获取返回结果,那如果我们想要获取返回结果该如何实现呢?java 为我们提供了 Callable 接口和 Future ,从JAVA SE 5.0开始引入了Callable和Future,通过它们构建的线程,在任务执行完成后就可以获取执行结果,这就是线程的第三种方式,那就是实现Callable接口。
Callable<V>
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
该接口声明了一个名称为 call() 的方法,同时这个方法可以有返回值 V,也可以抛出异常。无论是 Runnable 接口的实现类还是 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行,ThreadPoolExecutor或ScheduledThreadPoolExecutor都实现了ExcutorService 接口,而因此 Callable 需要和Executor 框架中的 ExcutorService 结合使用,我们先看看 ExecutorService 提供的方法
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
- 第一个方法:submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。
- 第二个方法:submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
- 第三个方法:submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。还有点要注意的是,除了我们自己实现Callable对象外,我们还可以使用工厂类Executors来把一个Runnable对象包装成Callable对象。Executors工厂类提供的方法如下
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
Future<V>
Future<V>接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。Future接口的源码:
public interface Future<V> {
// 如果任务还没有开始,执行 cancel() 方法将返回false,如果任务已经启动,执行 cancel(true) 方法将
// 已中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true,当任务已经启动,执行
// cancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false
// 当任务已经完成,执行 cancel() 方法将返回 false ,mayInterruptlfRunning 参数表示是否中断执行中的
// 线程,通过方法分析我们也知道实际上 Future 提供了 3 中可能:1.能够中断执行中的任务,2.判断
// 任务是否执行完成,3.获取任务执行完成后的结果,但是 Future 是一个接口,我们无法直接创建对象
// 因此就需要其实现类 FutureTask
boolean cancel(boolean mayInterruptIfRunning);
// 如果任务完成前被取消,则返回 true
boolean isCancelled();
// 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true
boolean isDone();
// 该方法用于获取异步执行的结果,如果没有结果返回,该方法会一直阻塞直到任务执行完成
V get() throws InterruptedException, ExecutionException;
// 获取异步执行结果,如果没有结果返回,此方法会一直阻塞,但是会有时间的限制
// 如果阻塞时间超过设定的 timeout 时间,该方法会抛出异常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask类
FutureTask的实现
public class FutureTask<V> implements RunnableFuture<V> {}
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
可以看到 FutureTask 除了实现了 Future 接口外还实现了 Runnable 接口,因此 FutureTask 也可以直接提交给 Executor 执行。 当然也可以调用线程直接执行(FutureTask.run())。接下来我们根据 FutureTask.run() 的执行时机来分析其所处的3种状态
-
未启动,FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态,当创建一个FutureTask,而且没有执行FutureTask.run()方法前,这个FutureTask也处于未启动状态。
-
已启动,FutureTask.run()被执行的过程中,FutureTask处于已启动状态。
-
已完成,FutureTask.run()方法执行完正常结束,或者被取消或者抛出异常而结束,FutureTask都处于完成状态。
-
当FutureTask处于未启动或已启动状态时,如果此时我们执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
-
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行。
当FutureTask处于已启动状态时,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功,cancel(...)返回true;但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(...)返回false。
当任务已经完成,执行cancel(...)方法将返回false。 -
FutureTask 构造函数
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
简单使用案例
在互联网应用中有时候可能数据量会很大会把数据存储在不同的服务器,查询的时候会去查询不通的服务器,然后在把数据汇总,传统查询查完 A 服务器 再去 查询 B 服务然后在汇总,这样可能会导致等带的时间会比较长,使用 future 带返回值得线程就可以很好的解决这样的问题,在查询 A 服务器的同时去查询 B 服务器,最后在把数据汇总,例如
public class ExecutorFutureDemo implements Callable<Integer> {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
Future<Integer> result = executor.submit(new ExecutorFutureDemo());
System.out.println("开始查询服务器 B ...");
// 保存服务器 B 查询结果
int sum = 0;
// 主线程查询服务器 B 的结果
for (int i = 1; i <= 100 ; i++) {
sum += i;
}
// 模拟服务器 B 的查询时间
Thread.sleep(3000);
System.out.println("服务器 B 查询完毕。");
if(!result.isDone()){
System.out.println("服务器 A 数数据还没有查询完毕...");
}
// 在没有得到结果之前会一直阻塞在这里,直到拿到返回结果
Integer aSum = result.get();
long endTime = System.currentTimeMillis();
System.out.println("查询结果="+(aSum+sum)+",总耗时="+(endTime-startTime)/1000);
// 关闭线程池
executor.shutdown();
}
@Override
public Integer call() throws Exception {
System.out.println("开始查询服务器 A ...");
// 保存服务器 A 查询结果
int sum = 0;
try {
// 模拟服务器 A 的 查询时间
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 50 ; i++) {
sum += i;
}
System.out.println("服务器 A 查询完毕。");
return sum;
}
}
结果
开始查询服务器 B ...
开始查询服务器 A ...
服务器 B 查询完毕。
服务器 A 数数据还没有查询完毕...
服务器 A 查询完毕。
查询结果=6325,总耗时= 5 秒
传统查询需要 8 秒,而示例只需要 5 秒即可。
以上是关于多线程 - CallableFuture 和 FutureTask 简单应用的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程编程:CallableFuture和FutureTask浅析
多线程 - CallableFuture 和 FutureTask 简单应用
Java并发编程- CallableFuture和FutureTask