java基础-Runnable与Callable
Posted holoyong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java基础-Runnable与Callable相关的知识,希望对你有一定的参考价值。
下面这段测试代码,如果取消第一个future.get的注释,也就是说等第一次submit提交的任务执行完成后,再提交同一个任务,再次提交的任务并不会得到执行,因为此时callable的status已经不是NEW了。但如果将第一个future.get注释掉,也就是在第一次submit提交的任务完成前,再提交同一个任务,则再次提交的任务依旧会被执行,因为此时callable的status仍然是NEW。这点与使用execute提交Runnable任务有区别,Runnable本身是没有任何状态标识的,注意,Callable本身也没有状态,但是在jdk Executor框架中提交的Callable会被包装为java.util.concurrent.FutureTask。
1 @Test 2 public void testCallable() throws InterruptedException, ExecutionException { 3 ExecutorService executor = Executors.newFixedThreadPool(2); 4 // int i = 0; 5 final AtomicInteger i = new AtomicInteger(0); 6 Callable<Object> callable = new Callable<Object>() { 7 @Override 8 public Object call() throws Exception { 9 Thread.sleep(1000); 10 System.out.println(i.incrementAndGet()); 11 return null; 12 } 13 }; 14 Future<Object> future = executor.submit(callable); 15 // future.get(); 16 executor.submit(callable); 17 executor.shutdown(); 18 // executor.shutdownNow(); 19 executor.awaitTermination(20000, TimeUnit.MILLISECONDS); 20 }
先来看Executor提交(execute)Runnable任务的主要逻辑
1 int c = ctl.get(); 2 //如果worker(执行线程)数小于corePoolSize,则增加worker,并将command作为其第一个work(Runnable) 3 if (workerCountOf(c) < corePoolSize) { 4 if (addWorker(command, true)) 5 return; 6 c = ctl.get(); 7 } 8 //将command入队,并检查Executor状态,如果在运行但worker数为0(因为woker到达空闲时间就会被取消,就可能出现降为0的情况),则(如果worker数小于maximumPoolSize)增加woker 9 if (isRunning(c) && workQueue.offer(command)) { 10 int recheck = ctl.get(); 11 if (! isRunning(recheck) && remove(command)) 12 reject(command); 13 else if (workerCountOf(recheck) == 0) 14 addWorker(null, false); 15 } 16 //最后再尝试添加worker,如果不成功说明Executor被关闭了或者worker数达到最大了 17 else if (!addWorker(command, false)) 18 reject(command);
再来Executor提交(submit)Callable
1 public <T> Future<T> submit(Callable<T> task) { 2 if (task == null) throw new NullPointerException(); 3 //newTaskFor就是将Callable包装成一个FutureTask(RunnableFuture的实现,Future代表了一个异步计算的结果,RunnableFuture就是Future继承了Runnable,表示可执行的Future,可见FutureTask可执行(真正的执行逻辑call被run包装)、可获得计算结果(通过wait、notify),并且还通过interrupt提供了可取消) 4 RunnableFuture<T> ftask = newTaskFor(task); 5 //execute就是提交Runnable的方法 6 execute(ftask); 7 return ftask; 8 }
FutureTask类
1 public class FutureTask<V> implements RunnableFuture<V> { 2 3 ...... 4 5 private volatile int state; 6 private static final int NEW = 0; 7 private static final int COMPLETING = 1; 8 private static final int NORMAL = 2; 9 private static final int EXCEPTIONAL = 3; 10 private static final int CANCELLED = 4; 11 private static final int INTERRUPTING = 5; 12 private static final int INTERRUPTED = 6; 13 14 /** The underlying callable; nulled out after running */ 15 private Callable<V> callable; 16 /** The result to return or exception to throw from get() */ 17 private Object outcome; // non-volatile, protected by state reads/writes 18 /** The thread running the callable; CASed during run() */ 19 private volatile Thread runner; 20 /** Treiber stack of waiting threads,可能会有多个线程get同一个future,Callable执行完成后就需要notify所有的这些等待线程*/ 21 private volatile WaitNode waiters; 22 23 ...... 24 25 public FutureTask(Callable<V> callable) { 26 if (callable == null) 27 throw new NullPointerException(); 28 this.callable = callable; 29 this.state = NEW; // ensure visibility of callable 30 } 31 32 ...... 33 }
其实我们完全可以自己调用FutureTask
1 @Test 2 public void testFutureTask() throws ExecutionException, InterruptedException { 3 FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() { 4 @Override 5 public Integer call() throws Exception { 6 Thread.sleep(1000); 7 return 1; 8 } 9 }); 10 new Thread(futureTask).start(); 11 Integer result = futureTask.get(); 12 System.out.println(result);//1 13 }
Executor的shutdown就是interrupt所有未被执行的work,shutdownNow则是interrupt所有work,awaitTermination并不对work进行操作,只是判断Executor是否已经terminated(shutdown了,并且没有work在执行了,就是terminated了)。
以上是关于java基础-Runnable与Callable的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程之Callable接口与Runnable的实现以及选择