java基础-Runnable与Callable

Posted holoyong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java基础-Runnable与Callable相关的知识,希望对你有一定的参考价值。

下面这段测试代码,如果取消第一个future.get的注释,也就是说等第一次submit提交的任务执行完成后,再提交同一个任务,再次提交的任务并不会得到执行,因为此时callable的status已经不是NEW了。但如果将第一个future.get注释掉,也就是在第一次submit提交的任务完成前,再提交同一个任务,则再次提交的任务依旧会被执行,因为此时callable的status仍然是NEW。这点与使用execute提交Runnable任务有区别,Runnable本身是没有任何状态标识的,注意,Callable本身也没有状态,但是在jdk Executor框架中提交的Callable会被包装为java.util.concurrent.FutureTask。

 

 1     @Test
 2     public void testCallable() throws InterruptedException, ExecutionException {
 3         ExecutorService executor = Executors.newFixedThreadPool(2);
 4 //        int i = 0;
 5         final AtomicInteger i = new AtomicInteger(0);
 6         Callable<Object> callable = new Callable<Object>() {
 7             @Override
 8             public Object call() throws Exception {
 9                 Thread.sleep(1000);
10                 System.out.println(i.incrementAndGet());
11                 return null;
12             }
13         };
14         Future<Object> future = executor.submit(callable);
15 //        future.get();
16         executor.submit(callable);
17         executor.shutdown();
18 //        executor.shutdownNow();
19         executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
20     }

 

先来看Executor提交(execute)Runnable任务的主要逻辑

 1         int c = ctl.get();
 2         //如果worker(执行线程)数小于corePoolSize,则增加worker,并将command作为其第一个work(Runnable)
 3         if (workerCountOf(c) < corePoolSize) {
 4             if (addWorker(command, true))
 5                 return;
 6             c = ctl.get();
 7         }
 8         //将command入队,并检查Executor状态,如果在运行但worker数为0(因为woker到达空闲时间就会被取消,就可能出现降为0的情况),则(如果worker数小于maximumPoolSize)增加woker
 9         if (isRunning(c) && workQueue.offer(command)) {
10             int recheck = ctl.get();
11             if (! isRunning(recheck) && remove(command))
12                 reject(command);
13             else if (workerCountOf(recheck) == 0)
14                 addWorker(null, false);
15         }
16         //最后再尝试添加worker,如果不成功说明Executor被关闭了或者worker数达到最大了
17         else if (!addWorker(command, false))
18             reject(command);    

 

再来Executor提交(submit)Callable

1     public <T> Future<T> submit(Callable<T> task) {
2         if (task == null) throw new NullPointerException();
3         //newTaskFor就是将Callable包装成一个FutureTask(RunnableFuture的实现,Future代表了一个异步计算的结果,RunnableFuture就是Future继承了Runnable,表示可执行的Future,可见FutureTask可执行(真正的执行逻辑call被run包装)、可获得计算结果(通过wait、notify),并且还通过interrupt提供了可取消)
4         RunnableFuture<T> ftask = newTaskFor(task);
5         //execute就是提交Runnable的方法
6         execute(ftask);
7         return ftask;
8     }

 

FutureTask

技术分享
 1 public class FutureTask<V> implements RunnableFuture<V> {
 2 
 3     ......
 4 
 5     private volatile int state;
 6     private static final int NEW          = 0;
 7     private static final int COMPLETING   = 1;
 8     private static final int NORMAL       = 2;
 9     private static final int EXCEPTIONAL  = 3;
10     private static final int CANCELLED    = 4;
11     private static final int INTERRUPTING = 5;
12     private static final int INTERRUPTED  = 6;
13 
14     /** The underlying callable; nulled out after running */
15     private Callable<V> callable;
16     /** The result to return or exception to throw from get() */
17     private Object outcome; // non-volatile, protected by state reads/writes
18     /** The thread running the callable; CASed during run() */
19     private volatile Thread runner;
20     /** Treiber stack of waiting threads,可能会有多个线程get同一个future,Callable执行完成后就需要notify所有的这些等待线程*/
21     private volatile WaitNode waiters;
22 
23     ......
24 
25     public FutureTask(Callable<V> callable) {
26         if (callable == null)
27             throw new NullPointerException();
28         this.callable = callable;
29         this.state = NEW;       // ensure visibility of callable
30     }
31 
32     ......
33 }
View Code

 

其实我们完全可以自己调用FutureTask

技术分享
 1     @Test
 2     public void testFutureTask() throws ExecutionException, InterruptedException {
 3         FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
 4             @Override
 5             public Integer call() throws Exception {
 6                 Thread.sleep(1000);
 7                 return 1;
 8             }
 9         });
10         new Thread(futureTask).start();
11         Integer result = futureTask.get();
12         System.out.println(result);//1
13     }
View Code

 

Executor的shutdown就是interrupt所有未被执行的work,shutdownNow则是interrupt所有work,awaitTermination并不对work进行操作,只是判断Executor是否已经terminated(shutdown了,并且没有work在执行了,就是terminated了)。

以上是关于java基础-Runnable与Callable的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程Runnable与Callable区别与拓展

Java多线程之Callable接口与Runnable的实现以及选择

Callable, Runnable, Future, FutureTask

Java Callable使用

callable和runnable和future

java 创建线程的三种方法Callable,Runnable,Thread比较及用法