java并发之线程执行器(Executor)
Posted 专注,坚持
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发之线程执行器(Executor)相关的知识,希望对你有一定的参考价值。
线程执行器和不使用线程执行器的对比(优缺点)
1.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可。
2.使用线程池来提高程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务。避免了不断创建和销毁线程导致的性能开销。
3.执行器可以处理实现了Callable接口的任务。Callable接口类似于Runnable接口,却提供了两方面的增强:
a.Callable主方法名称为call(),可以返回结果
b.当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。
4.提供了一些操作线程任务的功能
- 执行继承了Runnable接口的任务类
1 public class Task implements Runnable { 2 private String name; 3 4 public Task(String name){ 5 this.name=name; 6 } 7 @Override 8 public void run() { 9 } 10 }
1 public class Server { 2 private ThreadPoolExecutor executor; 3 4 public Server(){ 5 executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); 6 } 7 public void executeTask(Task task){ 8 System.out.printf("Server: A new task has arrived\\n"); 9 executor.execute(task); 10 System.out.printf("Server: Active Count: %d\\n",executor.getActiveCount()); 11 System.out.printf("Server: Completed Tasks: %d\\n",executor.getCompletedTaskCount()); 12 } 13 public void endServer() { 14 executor.shutdown(); 15 } 16 }
- 执行实现了Callable<T>接口的任务
1 public class FactorialCalculator implements Callable<Integer> { 2 private Integer number; 3 public FactorialCalculator(Integer number){ 4 this.number=number; 5 } 6 7 @Override 8 public Integer call() throws Exception { 9 int num, result; 10 11 num=number.intValue(); 12 result=1; 13 14 // If the number is 0 or 1, return the 1 value 15 if ((num==0)||(num==1)) { 16 result=1; 17 } else { 18 // Else, calculate the factorial 19 for (int i=2; i<=number; i++) { 20 result*=i; 21 Thread.sleep(20); 22 } 23 } 24 System.out.printf("%s: %d\\n",Thread.currentThread().getName(),result); 25 // Return the value 26 return result; 27 } 28 }
1 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);//实例化执行器 2 FactorialCalculator calculator = new FactorialCalculator(number);//实例化任务 3 Future<Integer> result = executor.submit(calculator);//执行任务,并返回Future<T>实例
1 public interface Callable<V> { 2 V call() throws Exception; 3 }
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
1 ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1); 2 executor.schedule(task,i+1 , TimeUnit.SECONDS);
1 public <V> ScheduledFuture<V> schedule(Callable<V> callable,//即将执行的任务 2 long delay,//任务执行前需要等待的时间 3 TimeUnit unit)//时间单位
1 ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1); 2 3 for (int i=0; i<5; i++) { 4 Task task=new Task("Task "+i); 5 executor.schedule(task,i+1 , TimeUnit.SECONDS); 6 } 7 8 executor.shutdown();
1 public <V> ScheduledFuture<V> schedule(Callable<V> callable,//即将执行的任务 2 long delay, //任务执行前需要等待的时间 3 TimeUnit unit) //时间单位
1 ScheduledExecutorService executor=Executors.newScheduledThreadPool(1); 2 3 Task task=new Task("Task"); 4 ScheduledFuture<?> result=executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, //将被周期性执行的任务 2 long initialDelay,//任务第一次执行后的延后时间 3 long period, //两次执行的时间周期 4 TimeUnit unit) { //第二和第三个参数的时间单位
1 UserValidator ldapValidator=new UserValidator("LDAP"); 2 UserValidator dbValidator=new UserValidator("DataBase"); 3 4 // Create two tasks for the user validation objects 5 TaskValidator ldapTask=new TaskValidator(ldapValidator, username, password); 6 TaskValidator dbTask=new TaskValidator(dbValidator,username,password); 7 8 // Add the two tasks to a list of tasks 9 List<TaskValidator> taskList=new ArrayList<>(); 10 taskList.add(ldapTask); 11 taskList.add(dbTask); 12 13 // Create a new Executor 14 ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool(); 15 String result; 16 try { 17 // Send the list of tasks to the executor and waits for the result of the first task 18 // that finish without throw and Exception. If all the tasks throw and Exception, the 19 // method throws and ExecutionException. 20 result = executor.invokeAny(taskList); 21 System.out.printf("Main: Result: %s\\n",result); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } catch (ExecutionException e) { 25 e.printStackTrace(); 26 } 27 28 // Shutdown the Executor 29 executor.shutdown();
1 public boolean validate(String name, String password) { 2 Random random=new Random(); 3 4 try { 5 Long duration=(long)(Math.random()*10); 6 System.out.printf("Validator %s: Validating a user during %d seconds\\n",this.name,duration); 7 TimeUnit.SECONDS.sleep(duration); 8 } catch (InterruptedException e) { 9 return false; 10 } 11 12 return random.nextBoolean(); 13 }
1 List<Task> taskList = new ArrayList<>(); 2 for (int i = 0; i < 3; i++) { 3 Task task = new Task("Task-" + i); 4 taskList.add(task); 5 } 6 // Call the invokeAll() method 7 List<Future<Result>> resultList = null; 8 try { 9 resultList = executor.invokeAll(taskList); 10 } catch (InterruptedException e) { 11 e.printStackTrace(); 12 } 13 // Finish the executor 14 executor.shutdown();
1 Task task=new Task(); 2 3 Future<String> result=executor.submit(task); 4 5 try { 6 TimeUnit.SECONDS.sleep(2); 7 } catch (InterruptedException e) { 8 e.printStackTrace(); 9 } 10 11 System.out.printf("Main: Cancelling the Task\\n"); 12 result.cancel(true);
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
1 public class ResultTask extends FutureTask<String> { 2 @Override 3 protected void done() { 4 if (isCancelled()) { 5 System.out.printf("%s: Has been cancelled\\n",name); 6 } else { 7 System.out.printf("%s: Has finished\\n",name); 8 } 9 } 10 }
1.从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。
2.而CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
1 public class RejectedTaskController implements RejectedExecutionHandler { 2 3 @Override 4 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 5 System.out.printf("RejectedTaskController: The task %s has been rejected\\n",r.toString()); 6 System.out.printf("RejectedTaskController: %s\\n",executor.toString()); 7 System.out.printf("RejectedTaskController: Terminating: %s\\n",executor.isTerminating()); 8 System.out.printf("RejectedTaksController: Terminated: %s\\n",executor.isTerminated()); 9 } 10 }
1 public static void main(String[] args) { 2 // Create the controller for the Rejected tasks 3 RejectedTaskController controller=new RejectedTaskController(); 4 // Create the executor and establish the controller for the Rejected tasks 5 ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); 6 executor.setRejectedExecutionHandler(controller); 7 8 // Lauch three tasks 9 System.out.printf("Main: Starting.\\n"); 10 for (int i=0; i<3; i++) { 11 Task task=new Task("Task"+i); 12 executor.submit(task); 13 } 14 15 // Shutdown the executor 16 System.out.printf("Main: Shuting down the Executor.\\n"); 17 executor.shutdown(); 18 // Send another task 19 System.out.printf("Main: Sending another Task.\\n"); 20 Task task=new Task("RejectedTask"); 21 executor.submit(task); 22 23 // The program ends 24 System.out.printf("Main: End.\\n"); 25 26 }
Main: Starting. Main: Shuting down the Executor. Main: Sending another Task. RejectedTaskController: The task java.util.concurrent.FutureTask@60e53b93 has been rejected RejectedTaskController: java.util.concurrent.ThreadPoolExecutor@5e2de80c[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0] RejectedTaskController: Terminating: true RejectedTaksController: Terminated: false Main: End. Task Task1: Starting Task Task0: Starting Task Task2: Starting Task Task1: ReportGenerator: Generating a report during 4 seconds Task Task0: ReportGenerator: Generating a report during 7 seconds Task Task2: ReportGenerator: Generating a report during 6 seconds Task Task1: Ending Task Task2: Ending Task Task0: Ending
以上是关于java并发之线程执行器(Executor)的主要内容,如果未能解决你的问题,请参考以下文章
Java——多线程高并发系列之线程池(Executor)的理解与使用