Google Guava 并发编程 - ListenableFuture

Posted tuacy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Google Guava 并发编程 - ListenableFuture相关的知识,希望对你有一定的参考价值。

       并发编程在咱们实际开发过程中肯定经常用到。比如线程池呀、通过Future去获取任务的执行结果呀等等。Guava从实际出发,为了方便我们的使用对JDK里面原生的一些并发编程方式做了增强。

       Guava 里面并发编程相关的类非常多,我们对嘴常用的三个类MoreExecutors、Futures、ListenableFuture的使用做一个简单的介绍。我相信通过这三个类的介绍。咱们应该能用Guava来应对大部分的并发编程了。

一 MoreExecutors

       MoreExecutors是一个工具类(和咱们经常使用Executors类类似),用来生成各种Executors类。比如生成一个可以随着jvm关闭而关闭的线程池、创建一个顺序执行的Executor等等。我们对MoreExecutors里面的各个方法做一个简单的解释。

public final class MoreExecutors 
	/**
	 * 获得一个随着jvm关闭而关闭的线程池ExecutorService
	 * 并且可以设置jvm关闭时最多等待多长时间关闭线程池
	 */
	@Beta
	@GwtIncompatible // TODO
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static ExecutorService getExitingExecutorService(
		ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit);

	/**
	 * 获得一个随着jvm关闭而关闭的线程池
	 * 默认jvm关闭时最多等待120秒关闭线程池
	 */
	@Beta
	@GwtIncompatible // concurrency
	public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor);

	/**
	 * 获得一个随着jvm关闭而关闭的可以添加定时任务线程池ScheduledExecutorService
	 * 并且可以设置jvm关闭时最多等待多长时间关闭线程池
	 */
	@Beta
	@GwtIncompatible // TODO
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static ScheduledExecutorService getExitingScheduledExecutorService(
		ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit);

	/**
	 * 获得一个随着jvm关闭而关闭的定时任务线程池ExecutorService
	 * 默认jvm关闭时最多等待120秒关闭线程池
	 */
	@Beta
	@GwtIncompatible // TODO
	public static ScheduledExecutorService getExitingScheduledExecutorService(ScheduledThreadPoolExecutor executor);

	/**
	 * 增加关闭线程池的钩子,不改变ThreadFactory。
	 * jvm关闭的时候会调用到这些钩子方法。而且钩子里面调用了线程池的shutdown()函数。
	 * 所以可以在shutdown()函数里面做一些清理工作(所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作)
	 */
	@Beta
	@GwtIncompatible // TODO
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static void addDelayedShutdownHook(ExecutorService service, long terminationTimeout, TimeUnit timeUnit);


	/**
	 * 线程池里面的线程都会是后台线程。后台线程的特点如果一个程序里面没有了前台线程那么后台线程会关闭
	 */
	@GwtIncompatible // TODO
	private static void useDaemonThreadFactory(ThreadPoolExecutor executor);


	/**
	 * 创建一个直接执行的线程池ListeningExecutorService,线程池里面所有的任务在提交的时候就执行
	 */
	@GwtIncompatible // TODO
	public static ListeningExecutorService newDirectExecutorService();

	/**
	 * 创建一个直接执行的Executor,调用execute()会在当前线程执行Runnable的方法
	 */
	public static Executor directExecutor();

	/**
	 * 创建一个顺序执行的Executor
	 */
	@Beta
	@GwtIncompatible
	public static Executor newSequentialExecutor(Executor delegate);

	/**
	 * ExecutorService转换为ListeningExecutorService
	 * 两者的区别就是submit(),invokeAll(),invokeAll()函数的返回值不同
	 * 一个返回ListenableFuture,一个返回Future
	 */
	@GwtIncompatible // TODO
	public static ListeningExecutorService listeningDecorator(ExecutorService delegate);

	/**
	 * ScheduledExecutorService转换为ListeningScheduledExecutorService。
	 * 两者的区别就是schedule(),scheduleAtFixedRate(),scheduleWithFixedDelay()函数返回值不同
	 * 一个返回ListenableScheduledFuture一个返回ScheduledFuture
	 */
	@GwtIncompatible // TODO
	public static ListeningScheduledExecutorService listeningDecorator(ScheduledExecutorService delegate);


	/**
	 * 返回用于创建新线程的默认线程工厂
	 */
	@Beta
	@GwtIncompatible // concurrency
	public static ThreadFactory platformThreadFactory();

	/**
	 * 尝试在指定时间内关闭线程池,并且返回是否关闭的结果
	 */
	@Beta
	@CanIgnoreReturnValue
	@GwtIncompatible // concurrency
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static boolean shutdownAndAwaitTermination(ExecutorService service, long timeout, TimeUnit unit);

       MoreExecutors里面有些方法返回的是ListeningExecutorService或者ListeningScheduledExecutorService。他两分别是ExecutorService和ScheduledExecutorService的扩展。他两其实很简单,在对应扩展类的基础上重写了一些方法把原来返回Future的地方换成了ListenableFuture。MoreExecutors里面其他的一些方法应该都是很好理解的吧。

二 Futures

       Futures我们也可以把他当做一个工具类来使用,里面也提供了一些对ListenableFuture各种各样的操作使用。我们也对Futures里面的一些方法做一个简单的介绍。如下所示。

public final class Futures extends GwtFuturesCatchingSpecialization  

	/**
	 * 创建一个设置value值的ListenableFuture,其value值立即会被设置,
	 * 这个 Future不能取消或超时,Future的isDone()方法总是返回true
	 */
	public static <V> ListenableFuture<V> immediateFuture(@Nullable V value);


	/**
	 * 创建一个构造时设置传入异常的ListenableFuture,返回的Future不能被取消,
	 * isDone()方法总是返回 true,调用get()方法会抛出被ExecutionException包装的Throwable
	 */
	public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable);

	/**
	 * 创建一个会被立即取消的ListenableFuture,所以isCancelled()方法总是返回true
	 */
	public static <V> ListenableFuture<V> immediateCancelledFuture();


	/**
	 * 创建一个异步的ListenableFuture,你可以简单的认为执行AsyncCallable产生的ListenableFuture
	 * 给定的callable操作由executor执行
	 */
	@Beta
	public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor);

	/**
	 * 延时一段时间去执行AsyncCallable里面的任务
	 * 给定的fallback操作由executor执行
	 */
	@Beta
	@GwtIncompatible // java.util.concurrent.ScheduledExecutorService
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static <O> ListenableFuture<O> scheduleAsync(
		AsyncCallable<O> callable,
		long delay,
		TimeUnit timeUnit,
		ScheduledExecutorService executorService);

	/**
	 * 如果input任务产生了exceptionType类型的异常,就会把该异常传入fallback里面去做处理,执行fallback里面的逻辑
	 * 给定的fallback操作由executor执行
	 */
	@Beta
	@Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
	public static <V, X extends Throwable> ListenableFuture<V> catching(
		ListenableFuture<? extends V> input,
		Class<X> exceptionType,
		Function<? super X, ? extends V> fallback,
		Executor executor);

	/**
	 * 如果input任务产生了exceptionType类型的异常,就会把该异常传入到fallback里面去做处理,执行fallback里面的逻辑
	 */
	@Beta
	@Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
	public static <V, X extends Throwable> ListenableFuture<V> catchingAsync(
		ListenableFuture<? extends V> input,
		Class<X> exceptionType,
		AsyncFunction<? super X, ? extends V> fallback,
		Executor executor);

	/**
	 * delegate指定时间没有获取到结果,就time out
	 */
	@Beta
	@GwtIncompatible // java.util.concurrent.ScheduledExecutorService
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static <V> ListenableFuture<V> withTimeout(
		ListenableFuture<V> delegate,
		long time,
		TimeUnit unit,
		ScheduledExecutorService scheduledExecutor);

	/**
	 * 对input执行的结果做转换
	 * 给定的function操作由executor执行
	 */
	@Beta
	public static <I, O> ListenableFuture<O> transformAsync(
		ListenableFuture<I> input,
		AsyncFunction<? super I, ? extends O> function,
		Executor executor);

	/**
	 * 对input执行的结果做转换
	 */
	@Beta
	public static <I, O> ListenableFuture<O> transform(
		ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor);

	/**
	 * 与transform方法类似,不过function的转换过程会每次调用返回的Future的get()方法
	 */
	@Beta
	@GwtIncompatible
	public static <I, O> Future<O> lazyTransform(
		final Future<I> input, final Function<? super I, ? extends O> function);

	/**
	 * 创建一个新的ListenableFuture,当给定的futures集合中future都完成时返回结果的list,如果任一 future失败,则创建的ListenableFuture会失败。
	 */
	@Beta
	@SafeVarargs
	public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures);
	@Beta
	public static <V> ListenableFuture<List<V>> allAsList(Iterable<? extends ListenableFuture<? extends V>> futures);

	/**
	 * 返回一个FutureCombiner。然后我们可以调用FutureCombiner的callAsync,call,Runnable三个方法,这是三个方法都会返回一个ListenableFuture
	 */
	@Beta
	@SafeVarargs
	public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures);
	@Beta
	public static <V> FutureCombiner<V> whenAllComplete(Iterable<? extends ListenableFuture<? extends V>> futures);

	/**
	 * 返回一个FutureCombiner。然后我们可以调用FutureCombiner的callAsync,call,Runnable三个方法,这是三个方法都会返回一个ListenableFuture
	 * 如果有一个任务异常了,三个方法返回的ListenableFuture也异常
	 */
	@Beta
	@SafeVarargs
	public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures);
	@Beta
	public static <V> FutureCombiner<V> whenAllSucceed(Iterable<? extends ListenableFuture<? extends V>> futures);


	/**
	 * 创建一个新的ListenableFuture,当给定的future完成时,它的结果才会被设置。
	 * 取消给定的future也会取消返回 的ListenableFuture,但是取消返回的ListenableFuture对给定的future没有影响
	 */
	@Beta
	public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future);

	/**
	 * 创建一个新的ListenableFuture,它的值是一个列表包含所有传入Future集合中Future执行成功后返回的值,
	 * 结果的顺序与传入集合的顺序一致,如果任何一个传入集合中的Future失败或被取消,其对应的位置的值将会被设置为null
	 */
	@Beta
	@SafeVarargs
	public static <V> ListenableFuture<List<V>> successfulAsList(ListenableFuture<? extends V>... futures);
	@Beta
	public static <V> ListenableFuture<List<V>> successfulAsList(Iterable<? extends ListenableFuture<? extends V>> futures);

	/**
	 * 按照完成时间的快慢排序
	 */
	@Beta
	public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures);

	/**
	 * 注册一个在任务成功或失败时运行的回调函数,一旦Future完成,则立即执行回调函数。
	 * callback在executor上执行
	 */
	public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor);

	/**
	 * 获取future对应的返回结果
	 */
	@CanIgnoreReturnValue
	// TODO(cpovirk): Consider calling getDone() in our own code.
	public static <V> V getDone(Future<V> future) throws ExecutionException;

	/**
	 * 如果任性过程中有异常发生会把RuntimeException之外的异常转换为exceptionClass异常
	 */
	@Beta
	@CanIgnoreReturnValue
	@GwtIncompatible // reflection
	public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) throws X;

	/**
	 * 同上,不过添加了获取结果的限定时间,指定时间没有获取到也会抛exceptionClass异常
	 */
	@Beta
	@CanIgnoreReturnValue
	@GwtIncompatible // reflection
	@SuppressWarnings("GoodTime") // should accept a java.time.Duration
	public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X;

	/**
	 * 任务执行过程中如果有异常产生会转换成UncheckedExecutionException异常
	 */
	@CanIgnoreReturnValue
	public static <V> V getUnchecked(Future<V> future);


       虽然对Futures里面的每个方法做了简单的介绍,但是为了加深大家的理解。这里我们对Futures里面的大部分方法都写了个简单的使用(这里解释下为啥我把这部分代码放到单元测试里面去了,我是为了方便运行)。代码地址 https://github.com/tuacy/google-guava-study 代码都在测试包里面,包路径com.tuacy.guava.study.concurrent。

三 ListenableFuture

       ListenableFuture顾名思义就是可以监听的Future,它是对java原生Future的扩展增强。

public interface ListenableFuture<V> extends Future<V> 
  void addListener(Runnable listener, Executor executor);

       我们可能会去想ListenableFuture和Future比的优势在哪里,然道仅仅只是通过监听及时来获取执行的结果么。肯定不是的配合Futures里面的方法我们还可以实现任务的链式调用(任务顺序执行,后一个任务可以拿到前一个任务的执行结果)。这个我们会在下面的ListenableFuture的使用里面讲到。我们还是先来看下ListenableFuture的创建。

3.1 ListenableFuture的创建

3.1.1 JdkFutureAdapters.listenInPoolThread()方法创建

       通过JdkFutureAdapters.listenInPoolThread()方法我们可以把Future转换成ListenableFuture。我们通过一个简单的实例来说明。

	@Test
	public void jdkFutureAdaptersTest() 

		final CountDownLatch latch = new CountDownLatch(1);
		FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() 
			@Override
			public String call() throws Exception 
				System.out.println("任务开始执行");
				// 模拟一个耗时操作
				Thread.sleep(10 * 1000);
				return "任务正常结束";
			
		);
		// 执行任务
		Executors.newCachedThreadPool().execute(futureTask);
		// futureTask转换成ListenableFuture
		ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(futureTask);
		// 监听返回结果
		Futures.addCallback(listenableFuture, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("任务结果: 正常执行完成 -- " + result);
				latch.countDown();
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("任务结果: 抛异常了");
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		try 
			// 等待所有的线程执行完
			latch.await();
		 catch (InterruptedException e) 
			e.printStackTrace();
		
	

3.1.2 ListenableFutureTask.create()方法创建

       通过ListenableFutureTask.create()方法我们可以把Callable、Runnable转换成ListenableFutureTask。我们还是用一个简单的代码来说明

	@Test
	public void listenableFutureTaskCreateTest() 

		final CountDownLatch latch = new CountDownLatch(2);
		// Callable转换成ListenableFutureTask
		ListenableFutureTask<String> listenableFutureTask1 = ListenableFutureTask.create(new Callable<String>() 
			@Override
			public String call() throws Exception 
				// 模拟一个耗时操作
				Thread.sleep(10 * 1000);
				return "我是正常的";
			
		);
		// 线程池里面执行任务
		//		Executors.newCachedThreadPool().execute(listenableTask);
		sService.execute(listenableFutureTask1);
		// 监听返回结果
		Futures.addCallback(listenableFutureTask1, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("任务1(测试正常执行)结果: 正常执行完成 -- " + result);
				latch.countDown();
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("任务1(测试正常执行)结果: 抛异常了");
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		// Runnable转换成ListenableFutureTask
		ListenableFutureTask<String> listenableFutureTask2 = ListenableFutureTask.create(new Runnable() 
			@Override
			public void run() 
				try 
					Thread.sleep(10 * 1000);
				 catch (InterruptedException e) 
					e.printStackTrace();
				
			
		, "success");
		sService.execute(listenableFutureTask2);
		Futures.addCallback(listenableFutureTask2, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("任务1(测试正常执行)结果: 正常执行完成 -- " + result);
				latch.countDown();
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("任务1(测试正常执行)结果: 抛异常了");
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		try 
			// 等待所有的线程执行完
			latch.await();
		 catch (InterruptedException e) 
			e.printStackTrace();
		

	

3.1.3 ListeningExecutorService对象的submit()方法创建

ListeningScheduledExecutorService对象的schedule()方法也类似

       ListeningExecutorService是ExecutorService的扩展类。submit()方法可以直接返回ListenableFuture。实例代码如下:

	//定义一个线程池,用于处理所有任务 -- MoreExecutors
	private final static ListeningExecutorService sService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
	

	/**
	 * 简单的使用ListenableFuture
	 */
	@Test
	public void taskCallbackTest() 

		final CountDownLatch latch = new CountDownLatch(2);
		final boolean normal = false;
		// 任务1 测试正常执行 简单的测试下任务的正常执行
		ListenableFuture<String> listenableFuture1 = sService.submit(() -> 
			// 模拟一个耗时操作
			Thread.sleep(10 * 1000);
			return "我是正常的";
		);
		Futures.addCallback(listenableFuture1, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("任务1(测试正常执行)结果: 正常执行完成 -- " + result);
				latch.countDown();
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("任务1(测试正常执行)结果: 抛异常了");
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		// 任务2 测试异常执行 简单的测试下任务执行过程中有异常情况
		ListenableFuture<String> listenableFuture2 = sService.submit(() -> 
			// 模拟一个耗时操作
			Thread.sleep(10 * 1000);
			if (normal) 
				return "我是正常的";
			 else 
				throw new Exception("哎呀,有异常了");
			
		);
		Futures.addCallback(listenableFuture2, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("任务2(测试异常执行)结果: 正常执行完成 -- " + result);
				latch.countDown();
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("任务2(测试异常执行)结果: 抛异常了");
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		try 
			// 等待所有的线程执行完
			latch.await();
		 catch (InterruptedException e) 
			e.printStackTrace();
		

	

3.2 给ListenableFuture添加FutureCallback

       给ListenableFuture类添加FutureCallback来监听ListenableFuture的完成。

       我们可以借助Futures.addCallback()给ListenableFuture添加FutureCallback来监听Future的执行结果。

		Futures.addCallback(future, new FutureCallback<String>() 
			@Override
			public void onSuccess(String result) 
				System.out.println("正常执行完成 -- " + result);
			

			@Override
			public void onFailure(Throwable t) 
				System.out.println("抛异常了");
			
		, MoreExecutors.directExecutor());

3.3 ListenableFuture的使用

       其实上面讲ListenableFuture的使用的时候,我们就已经吧ListenableFuture的简单使用随带讲了。

       这里我们在讲多个任务链式执行的使用。比如两个任务:任务A、任务B。在配合Futures.transformAsync()方法我们可以做到。还是直接用代码来说怎么使用,代码如下:

	//定义一个线程池,用于处理所有任务 -- MoreExecutors
	private final static ListeningExecutorService sService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
	

	/**
	 * Futures.transformAsync()支持多个任务链式异步执行,并且后面一个任务可以拿到前面一个任务的结果
	 */
	@Test
	public void multiTaskTransformAsyncTest() 

		final CountDownLatch latch = new CountDownLatch(1);

		// 第一个任务
		ListenableFuture<String> task1 = sService.submit(() -> 
			System.out.println("第一个任务开始执行...");
			try 
				Thread.sleep(10 * 1000);
			 catch (Exception e) 
				e.printStackTrace();
			
			return "第一个任务的结果";
		);

		// 第二个任务,里面还获取到了第一个任务的结果
		AsyncFunction<String, String> queryFunction = new AsyncFunction<String, String>() 
			public ListenableFuture<String> apply(String input) 
				return sService.submit(new Callable<String>() 
					public String call() throws Exception 
						System.out.println("第二个任务开始执行...");
						try 
							Thread.sleep(10 * 1000);
						 catch (Exception e) 
							e.printStackTrace();
						
						return input + " & 第二个任务的结果 ";
					
				);
			
		;

		// 把第一个任务和第二个任务关联起来
		ListenableFuture<String> first = Futures.transformAsync(task1, queryFunction, sService);

		// 监听返回结果
		Futures.addCallback(first, new FutureCallback<String>() 
			public void onSuccess(String result) 
				System.out.println("结果: " + result);
				latch.countDown();
			

			public void onFailure(Throwable t) 
				System.out.println(t.getMessage());
				latch.countDown();
			
		, MoreExecutors.directExecutor());

		try 
			// 等待所有的线程执行完
			latch.await();
		 catch (InterruptedException e) 
			e.printStackTrace();
		
	

       Google Guava的并发编程我们上面讲的非常简单。我们主要介绍了MoreExecutors、Futures类里面每个方法的含义。关于ListenableFuture、ListenableFutureTask、ListeningExecutorService、ListeningScheduledExecutorService这几个增强类的使用我们没怎么提。因为这几类的使用和Future、FutureTask、ExecutorService、ScheduledExecutorService的使用是类似的。

       文章中涉及到的代码在 https://github.com/tuacy/google-guava-study 工程代码里面测试 代码路径下可以找到。如果大家有什么疑问可以留言。

       如果大家对线程池相关的原理比较感兴趣可以看看我之前写的两篇文章。

以上是关于Google Guava 并发编程 - ListenableFuture的主要内容,如果未能解决你的问题,请参考以下文章

google Guava包的ListenableFuture解析

Day857.高性能限流器Guava RateLimiter -Java 并发编程实战

Day857.高性能限流器Guava RateLimiter -Java 并发编程实战

高效 告别996,开启java高效编程之门 5-1Guava开场

高效 告别996,开启java高效编程之门 5-1Guava开场

google Guava 快速入门