Java多线程ExecutorService与ExecutorCompletionService

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程ExecutorService与ExecutorCompletionService相关的知识,希望对你有一定的参考价值。

  ExecutorService与ExecutorCompletionService都是java.util.concurrent包的并发处理类,总的来说,ExecutorCompletionService是ExecutorService的功能增强版,ExecutorCompletionService以BlockingQueue<Future<V>>来存放已经完成的任务。

   也就是说,优先完成的任务会优先存放在BlockingQueue<Future<V>>队列中,所以我们能及时的拿到最优先的处理结果。


   让我们先看看ExecutorService的测试代码,共4个任务,我们刻意让第1个任务的执行时间最长,依次递减,代码如下:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestExecutorService 
{
	public static void main(String[] args) throws InterruptedException, ExecutionException 
	{
		ExecutorService es = Executors.newFixedThreadPool(4);
		
		//第一个任务
		Callable<Integer> task1 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				//耗时最长, 4秒
				Thread.sleep(4000);
				return 1;
			}
		};
		
		//第二个任务
		Callable<Integer> task2 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(3000);
				return 2;
			}
		};
		
		//第三个任务
		Callable<Integer> task3 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(2000);
				return 3;
			}
		};
		
		//第四个任务
		Callable<Integer> task4 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(1000);
				return 4;
			}
		};
		
		Future<Integer> result1 = es.submit(task1);
		Future<Integer> result2 = es.submit(task2);
		Future<Integer> result3 = es.submit(task3);
		Future<Integer> result4 = es.submit(task4);
		
		System.out.println("第1个任务等待中...");
		System.out.println("第1个任务完成:【" + result1.get() + "】");
		
		System.out.println("第2个任务等待中...");
		System.out.println("第2个任务完成:【" + result2.get() + "】");
		
		System.out.println("第3个任务等待中...");
		System.out.println("第3个任务完成:【" + result3.get() + "】");
		
		System.out.println("第4个任务等待中...");
		System.out.println("第4个任务完成:【" + result4.get() + "】");
	}
}

   输出结果:

第1个任务等待中...
第1个任务完成:【1】
第2个任务等待中...
第2个任务完成:【2】
第3个任务等待中...
第3个任务完成:【3】
第4个任务等待中...
第4个任务完成:【4】

   PS:Future.get()方法会造成阻塞,直到任务执行完毕为止。

   运行代码可见,result1.get()阻塞了4秒后完成任务,输出结果,而紧随的result2.get(),result3.get(),result4.get()没有阻塞,立马输出结果。那是因为在result1.get()执行完毕时,其余3个任务早已执行完毕等待抓取结果了。所以,使用上述方法并不能得知哪个任务是最先返回结果的。




   接下来,让我们看看ExecutorCompletionService的代码:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestExecutorComplectionService 
{
	public static void main(String[] args) throws InterruptedException, ExecutionException 
	{
		ExecutorService es = Executors.newFixedThreadPool(4);
		ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<Integer>(es);
		
		//第一个任务
		Callable<Integer> task1 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				//耗时最长, 4秒
				Thread.sleep(4000);
				return 1;
			}
		};
		
		//第二个任务
		Callable<Integer> task2 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(3000);
				return 2;
			}
		};
		
		//第三个任务
		Callable<Integer> task3 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(2000);
				return 3;
			}
		};
		
		//第四个任务
		Callable<Integer> task4 = new Callable<Integer>() {
			public Integer call() throws Exception 
			{
				Thread.sleep(1000);
				return 4;
			}
		};
		
		ecs.submit(task1);
		ecs.submit(task2);
		ecs.submit(task3);
		ecs.submit(task4);
		
		for(int i = 0; i < 4; i++)
		{
			Future<Integer> result = ecs.take();
			System.out.println("输出结果:【" + result.get() + "】");
		}
	}
}

   输出结果:

输出结果:【4】
输出结果:【3】
输出结果:【2】
输出结果:【1】

   可见,第4个任务task4执行的时间是最短的,第1个输出结果。


   下面让我们剖析一下ExecutorComplectionService的源码:


   成员变量如下:

技术分享

   executor:ExecutorService类,任务并行执行器

   completionQueue:就是保存执行结果的阻塞队列BlockingQueue


技术分享

   submit方法:底层依旧使用ExecutorService来并发执行任务,只不过是多了个功能【把执行完毕的任务放到complectionQueue队列中】


技术分享

   task方法:从complectionQueue队列中获取一个元素,如果没有元素,则阻塞,直到队列中有元素位置,这验证了我们之前的说法。


  总结:总的来说,ExecutorComplectionService其实就是 ExecutorService 和 BlockingQueue的结合。


  

   

   


    

本文出自 “DeaconLi” 博客,请务必保留此出处http://lizhuquan0769.blog.51cto.com/2591147/1789158

以上是关于Java多线程ExecutorService与ExecutorCompletionService的主要内容,如果未能解决你的问题,请参考以下文章

java多线程之Executor 与 ExecutorService两个基本接口

Java多线程系列七——ExecutorService

java多线程(ExecutorService)

Java--多线程之生产者消费者模式;线程池ExecutorService

java多线程之Executor框架线程池详细介绍与ThreadPoolExecutor

java中ExecutorService使用多线程处理业务