Java 中 Future 的 get 方法超时会怎样?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 中 Future 的 get 方法超时会怎样?相关的知识,希望对你有一定的参考价值。


一、背景

很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,​​ThreadPoolExecutor​​ 构造方法的一些参数和执行过程等。

工作中,很多人会使用线程池的 ​​submit​​​ 方法 获取 Future 类型的返回值,然后使用 ​​java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)​​ 实现“最多等多久”的效果。

但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。

Java

比如,​​java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)​​ 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?

如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。

最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?

二、模拟

2.1 常见写法

下面给出一个简单的模拟案例:

package basic.thread;

import java.util.concurrent.*;

public class FutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future<?> future = executorService.submit(() ->
try
demo();
catch (InterruptedException e)
throw new RuntimeException(e);

);

String threadName = Thread.currentThread().getName();
System.out.println(threadName + "获取的结果 -- start");
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(threadName + "获取的结果 -- end :" + result);



private static String demo() throws InterruptedException
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(threadName + ",执行 demo -- end");
return "test";

输出结果:

main获取的结果 -- start
pool-1-thread-1,执行 demo -- start
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:20)
pool-1-thread-1,执行 demo -- end

我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程“却”继续执行完毕。

2.2 尝试取消

我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when @code cancel is called,
* this task should never run. If the task has already started,
* then the @code mayInterruptIfRunning parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to @link #isDone will
* always return @code true. Subsequent calls to @link #isCancelled
* will always return @code true if this method returned @code true.
*
* @param mayInterruptIfRunning @code true if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return @code false if the task could not be cancelled,
* typically because it has already completed normally;
* @code true otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

看源码注释我们可以知道:

当设置为 true 时,正在执行的任务将被中断(interrupted);

当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。

2.2.1 cancel(false)

此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future<?> future = executorService.submit(() ->
try
demo();
catch (InterruptedException e)
throw new RuntimeException(e);

);

String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
catch (Exception e)
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

future.cancel(false);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");


private static String demo() throws InterruptedException
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";

结果:

1653751759233,main获取的结果 -- start
1653751759233,pool-1-thread-1,执行 demo -- start
1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:23)

1653751759351,main获取的结果 -- cancel
1653751760263,pool-1-thread-1,执行 demo -- end

我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。

2.2.2 cancel(true)

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future<?> future = executorService.submit(() ->
try
demo();
catch (InterruptedException e)
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));
throw new RuntimeException(e);

);

String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
catch (Exception e)
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

future.cancel(true);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");


private static String demo() throws InterruptedException
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";

执行结果:

1653752011246,main获取的结果 -- start
1653752011246,pool-1-thread-1,执行 demo -- start
1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:24)

1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at basic.thread.FutureDemo.demo(FutureDemo.java:36)
at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

1653752011366,main获取的结果 -- cancel

可以看出,此时,如果目标线程未执行完,那么会收到 ​​InterruptedException​​ ,被中断。

当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future<?> future = executorService.submit(() ->
demo();
);

String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
catch (Exception e)
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

future.cancel(true);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");


private static String demo()
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级");

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";

执行结果:

1653752219803,main获取的结果 -- start
1653752219803,pool-1-thread-1,执行 demo -- start
1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:19)

1653752219913,main获取的结果 -- cancel
1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级
1653752219914,pool-1-thread-1,执行 demo -- end

三、回归源码

我们直接看 ​​java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)​​ 的源码注释,就可以清楚地知道各种情况的表现:

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

我们还可以选取几个常见的实现类,查看下实现的基本思路:

java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit)

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);

java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)

/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally dont spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null)
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L)
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();

else if (q.thread != null && result == null)
try
ForkJoinPool.managedBlock(q);
catch (InterruptedException ie)
q.interruptControl = -1;



if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;

​java.util.concurrent.Future#cancel​​ 也一样

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when @code cancel is called,
* this task should never run. If the task has already started,
* then the @code mayInterruptIfRunning parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to @link #isDone will
* always return @code true. Subsequent calls to @link #isCancelled
* will always return @code true if this method returned @code true.
*
* @param mayInterruptIfRunning @code true if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return @code false if the task could not be cancelled,
* typically because it has already completed normally;
* @code true otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

java.util.concurrent.FutureTask#cancel

public boolean cancel(boolean mayInterruptIfRunning) 
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try // in case call to interrupt throws exception
if (mayInterruptIfRunning)
try
Thread t = runner;
if (t != null)
t.interrupt();
finally // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);


finally
finishCompletion();

return true;

可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法

java.util.concurrent.CompletableFuture#cancel

/**
* If not already completed, completes this CompletableFuture with
* a @link CancellationException. Dependent CompletableFutures
* that have not already completed will also complete
* exceptionally, with a @link CompletionException caused by
* this @code CancellationException.
*
* @param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*
* @return @code true if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning)
boolean cancelled = (result == null) &&
internalComplete(new AltResult(new CancellationException()));
postComplete();
return cancelled || isCancelled();

通过注释我们也发现,不同的实现类对参数的“效果”也有差异。

四、总结

我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。


创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。

Java


java 方法 执行超时处理

在java中如何监控一个java方法执行是否超时,该java方法的执行时间大约为20s,返回值为boolean类型,如果在20s内执行完则继续执行其他任务,如果执行时间超过20s,即中断当前执行的该方法,并向日志文件中写入如下内容“当前时间:xxx执行超时,请检查”,在网上看了好多资料,都写得云里雾里的。悬赏50分啊。

  java 1.5以上的Future类可以执行超时处理。

  jdk1.5自带的并发库中Future类中重要方法包括get()和cancel(),get()获取数据对象,如果数据没有加载,就会阻塞直到取到数据,而 cancel()是取消数据加载。另外一个get(timeout)操作,表示如果在timeout时间内没有取到就失败返回,而不再阻塞。

  代码如下:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import com.sun.corba.se.impl.orbutil.closure.Future;
import com.sun.corba.se.impl.orbutil.threadpool.TimeoutException;
public class ThreadTest   
  
    public static void main(String[] args) throws InterruptedException,  
            ExecutionException   
          
        final ExecutorService exec = Executors.newFixedThreadPool(1);  
          
        Callable<String> call = new Callable<String>()   
            public String call() throws Exception   
                //开始执行耗时操作  
                Thread.sleep(1000 * 5);  
                return "线程执行完成.";  
              
        ;  
          
        try   
            Future<String> future = exec.submit(call);  
            String obj = future.get(1000 * 1, TimeUnit.MILLISECONDS); //任务处理超时时间设为 1 秒  
            System.out.println("任务成功返回:" + obj);  
         catch (TimeoutException ex)   
            System.out.println("处理超时啦....");  
            ex.printStackTrace();  
         catch (Exception e)   
            System.out.println("处理失败.");  
            e.printStackTrace();  
          
        // 关闭线程池  
        exec.shutdown();  
      
参考技术A 写一个监听线程就ok啦,执行方法前启动一个线程,20s后去看那个方法有没有结束(可以搞个标记用于判断) 参考技术B 这个问题没有太好的安全解决办法。
你可以把方法封装在线程里,
然后外部用一个定时线程延时20S苏醒看看那个工作线程执行结束了没有,
如果没有,那么就设置工作线程的结束标志,等待工作线程结束。
在工作线程内部加上判断结束标志的代码,一旦为真马上退出本函数。
参考技术C 可以在方法里面加定时器, 从进方法开始。
schedule(TimerTask task, long delay, long period)
TimerTask :定时任务,可以在里面写一个中断或者返回。
具体逻辑:
mytask
设置定时任务(myTimerTask),20s即20000ms后执行“中断”;
这里是要执行的该任务的内容;
执行任务完成后取消定时任务并正常返回;


myTimerTask
执行中断操作;
写入错误日志;
追问

哥,你讲的也太粗了吧。。。。给点详细思路。我对这个定时器不太熟悉哎。

追答

定时器类Timer在java.util包中。使用时,先实例化,然后使用实例的schedule(TimerTask task, long delay)方法,设定指定的任务task在指定的延迟delay后执行。定时器任务类TimerTask是抽象类,继承并重写其run()方法,可实现具体任务。

你可以查一下API。

追问

是这样的我的整个系统已经设置了定时器,就是当系统启动的时候定时器就启动,系统的主体任务10min执行一次,任务主要是用循环的方式从其它平台获取数据的,但是如果有一个平台的系统因为性能原因卡死的话我这个系统就会卡在那边没反应,我是想设置一个超时检查,当循环获取数据的时候有一个卡死就提示出错,不太可能使用嵌套的定时器去控制数据的获取,不知道你是否知道如何使用 响应超时这方面的方法。

追答

加一个拦截器,拦截你想监控的操作, 拦截器里面写定时器, 不过这样的话会牺牲很多效率

本回答被提问者和网友采纳
参考技术D 搞个线程试试

以上是关于Java 中 Future 的 get 方法超时会怎样?的主要内容,如果未能解决你的问题,请参考以下文章

Java 中 Future 的 get 方法超时会怎样?

java 方法 执行超时处理

Future和Promise

Java Spring 切面 aop 超时?

future.get 超时 啥原因

为啥 Java Future.get(timeout) 不可靠?