Guava:使用ListenableFuture来执行有返回值的线程任务

Posted 水田如雅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Guava:使用ListenableFuture来执行有返回值的线程任务相关的知识,希望对你有一定的参考价值。

首先来定义一个任务:

  @Getter
    @Setter
    @AllArgsConstructor
    class GetThreadReturnResult implements Callable<String>, Serializable 
        private static final long serialVersionUID = 2277437296426921203L;
        private String id;

        @Override
        public String call() throws Exception 
            Thread.sleep(1000);
            return id;
        
    

先来写个简单的:

 @Test
    public void test2() throws Exception 
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        ConcurrentLinkedQueue<Future<String>> futureQueue = new ConcurrentLinkedQueue<>();
        List<String> idList = Lists.newArrayList("a", "b", "c");
        ExecutorService rateService = new ThreadPoolExecutor(4, 8, 240, TimeUnit.SECONDS, new LinkedBlockingQueue<>(500), new ThreadPoolExecutor.CallerRunsPolicy());
        long start = System.currentTimeMillis();
        for (String id : idList) 
            GetThreadReturnResult t = new GetThreadReturnResult(id);
            Future<String> stringFuture = rateService.submit(t);
            futureQueue.offer(stringFuture);
        
        while (!futureQueue.isEmpty()) 
            if (futureQueue.peek().isDone() && !futureQueue.peek().isCancelled()) 
                queue.offer(futureQueue.poll().get());
            
        
        System.out.println(JSONObject.toJSONString(queue));
        System.out.println("cost:" + (System.currentTimeMillis() - start));
    

把所有执行结果Future放入一个队列,之后对这个队列进行取结果操作,但是可能出现这种情况,由于线程执行是乱序的,当前拿到队首的结果是没执行完的,但是拿到的队尾的结果是执行完了的,但是队首的一直执行不完,导致取结果卡在这里,其实还不是很好的能去去取结果的一个操作。

Guava里面提供了回调的操作,去获取结果:

@Test
    public void test3() throws Exception 
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        List<String> idList = Lists.newArrayList("a", "b", "c");
        CountDownLatch countDownLatch = new CountDownLatch(idList.size());
        long start = System.currentTimeMillis();
        for (String id : idList) 
            ListenableFuture<String> stringFuture = executorService.submit(new GetThreadReturnResult(id));
            Futures.addCallback(stringFuture, new FutureCallback<String>() 
                @Override
                public void onSuccess(@Nullable String result) 
                    queue.offer(result);
                    countDownLatch.countDown();
                

                @Override
                public void onFailure(Throwable t) 
                    countDownLatch.countDown();
                    System.out.println(JSONObject.toJSONString(t));
                
            , executorService);
        
        countDownLatch.await();
        System.out.println(JSONObject.toJSONString(queue));
        System.out.println("cost:" + (System.currentTimeMillis() - start));

    

callback实现:

 public static <V> void addCallback(final ListenableFuture<V> future,
      final FutureCallback<? super V> callback, Executor executor) 
    Preconditions.checkNotNull(callback);
    Runnable callbackListener = new Runnable() 
      @Override
      public void run() 
        final V value;
        try 
          // TODO(user): (Before Guava release), validate that this
          // is the thing for IE.
          value = getUninterruptibly(future);
         catch (ExecutionException e) 
          callback.onFailure(e.getCause());
          return;
         catch (RuntimeException e) 
          callback.onFailure(e);
          return;
         catch (Error e) 
          callback.onFailure(e);
          return;
        
        callback.onSuccess(value);
      
    ;
    future.addListener(callbackListener, executor);
  
//....
public static <V> V getUninterruptibly(Future<V> future)
      throws ExecutionException 
    boolean interrupted = false;
    try 
      while (true) 
        try 
          return future.get();
         catch (InterruptedException e) 
          interrupted = true;
        
      
     finally 
      if (interrupted) 
        Thread.currentThread().interrupt();
      
    
  

这里其实是单独开了个线程去调用future的get方法,当执行完成之后,再去执行callback回调。

以上是关于Guava:使用ListenableFuture来执行有返回值的线程任务的主要内容,如果未能解决你的问题,请参考以下文章

Guava:使用ListenableFuture来执行有返回值的线程任务

Guava:使用ListenableFuture来执行有返回值的线程任务

Guava:使用ListenableFuture来执行有返回值的线程任务

google Guava包的ListenableFuture解析

GUAVA-ListenableFuture实现回调

从Java Future到Guava ListenableFuture实现异步调用