Guava:使用ListenableFuture来执行有返回值的线程任务
Posted 水田如雅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Guava:使用ListenableFuture来执行有返回值的线程任务相关的知识,希望对你有一定的参考价值。
首先来定义一个任务:
@Getter
@Setter
@AllArgsConstructor
class GetThreadReturnResult implements Callable<String>, Serializable
private static final long serialVersionUID = 2277437296426921203L;
private String id;
@Override
public String call() throws Exception
Thread.sleep(1000);
return id;
先来写个简单的:
@Test
public void test2() throws Exception
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Future<String>> futureQueue = new ConcurrentLinkedQueue<>();
List<String> idList = Lists.newArrayList("a", "b", "c");
ExecutorService rateService = new ThreadPoolExecutor(4, 8, 240, TimeUnit.SECONDS, new LinkedBlockingQueue<>(500), new ThreadPoolExecutor.CallerRunsPolicy());
long start = System.currentTimeMillis();
for (String id : idList)
GetThreadReturnResult t = new GetThreadReturnResult(id);
Future<String> stringFuture = rateService.submit(t);
futureQueue.offer(stringFuture);
while (!futureQueue.isEmpty())
if (futureQueue.peek().isDone() && !futureQueue.peek().isCancelled())
queue.offer(futureQueue.poll().get());
System.out.println(JSONObject.toJSONString(queue));
System.out.println("cost:" + (System.currentTimeMillis() - start));
把所有执行结果Future放入一个队列,之后对这个队列进行取结果操作,但是可能出现这种情况,由于线程执行是乱序的,当前拿到队首的结果是没执行完的,但是拿到的队尾的结果是执行完了的,但是队首的一直执行不完,导致取结果卡在这里,其实还不是很好的能去去取结果的一个操作。
Guava里面提供了回调的操作,去获取结果:
@Test
public void test3() throws Exception
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
List<String> idList = Lists.newArrayList("a", "b", "c");
CountDownLatch countDownLatch = new CountDownLatch(idList.size());
long start = System.currentTimeMillis();
for (String id : idList)
ListenableFuture<String> stringFuture = executorService.submit(new GetThreadReturnResult(id));
Futures.addCallback(stringFuture, new FutureCallback<String>()
@Override
public void onSuccess(@Nullable String result)
queue.offer(result);
countDownLatch.countDown();
@Override
public void onFailure(Throwable t)
countDownLatch.countDown();
System.out.println(JSONObject.toJSONString(t));
, executorService);
countDownLatch.await();
System.out.println(JSONObject.toJSONString(queue));
System.out.println("cost:" + (System.currentTimeMillis() - start));
callback实现:
public static <V> void addCallback(final ListenableFuture<V> future,
final FutureCallback<? super V> callback, Executor executor)
Preconditions.checkNotNull(callback);
Runnable callbackListener = new Runnable()
@Override
public void run()
final V value;
try
// TODO(user): (Before Guava release), validate that this
// is the thing for IE.
value = getUninterruptibly(future);
catch (ExecutionException e)
callback.onFailure(e.getCause());
return;
catch (RuntimeException e)
callback.onFailure(e);
return;
catch (Error e)
callback.onFailure(e);
return;
callback.onSuccess(value);
;
future.addListener(callbackListener, executor);
//....
public static <V> V getUninterruptibly(Future<V> future)
throws ExecutionException
boolean interrupted = false;
try
while (true)
try
return future.get();
catch (InterruptedException e)
interrupted = true;
finally
if (interrupted)
Thread.currentThread().interrupt();
这里其实是单独开了个线程去调用future的get方法,当执行完成之后,再去执行callback回调。
以上是关于Guava:使用ListenableFuture来执行有返回值的线程任务的主要内容,如果未能解决你的问题,请参考以下文章
Guava:使用ListenableFuture来执行有返回值的线程任务
Guava:使用ListenableFuture来执行有返回值的线程任务
Guava:使用ListenableFuture来执行有返回值的线程任务