Thread专题 - 取消和关闭

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Thread专题 - 取消和关闭相关的知识,希望对你有一定的参考价值。

此文被笔者收录在系列文章 ​​​架构师必备(系列)​​ 中

java中没有提供任何机制,来安全是强迫线程停止手头的工作,Thread.stop和Thread.suspend方法存在严重的缺陷,不能使用。但每个Thread提供了Interruption中断,一种协作机制来协调线程间的操作和控制。这是JAVA中推荐的方式。程序不应该立即停止,应该采用中断这种协作机制来处理,正确的做法是:先清除当前进程中的工作,再终止。正常有四种方法:

  • 正常结束;
  • 设置一个标志位,由外部线程来控制,原理是设置一个volatile变量,使线程池不在创建新线程达到平滑关闭的效果;适合一直在运行的长时间任务;
  • 阻塞线程:用interrupt()方法会马上抛出异常,捕获到这个线程后break跳出强制关闭;
  • 未阻塞线程:用interrupt()方法设置中断标志位,然后在循环时用isInterrupted()来判断中断标志位,其实和自定义标志位一样原理;

一、任务取消

当外部代码能在活动自然完成之前,把它更改为完成状态,被称为取消。取消的原因很多种可能,比如:用户请求、限时活动、应用程序设计如此、错误、关闭。

java中没有一种绝对安全停止线程的方法,只能选择相互协作的机制,通过协作,使任务和代码遵循一个统一的协议,用来请求取消。一个可取消的任务必须有取消策略,这个策略是一套程序,规定了不同任务或机制间的协作,保证数据的统一。

@ThreadSafe
public class PrimeGenerator implements Runnable
private static ExecutorService exec = Executors.newCachedThreadPool();

@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;

public void run()
BigInteger p = BigInteger.ONE;
while (!cancelled)
p = p.nextProbablePrime();
synchronized (this)
primes.add(p);




public void cancel()
cancelled = true;


public synchronized List<BigInteger> get()
return new ArrayList<BigInteger>(primes);


//让上一个程序在1秒后停止,但这并不是严格的一秒,可能存在误差。
static List<BigInteger> aSecondOfPrimes() throws InterruptedException
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try
SECONDS.sleep(1);
finally
generator.cancel();

return generator.get();//这是一个自定义的非阻塞的方法

中断

线程中断是一个协作机制,一个线程给另一个线程发送信号,通知它在方便或可能的情况下停止正在做的工作,去做其他事情 。但实际上,使用中断来处理取消之外的任何事情都是不明智的,中断通常是实现取消最明知的选择,一般在取消方法中设置中断状态。

每个线程都有一个boolean的中断状态,Thread包含一些中断线程的方法:interrupt方法中断目标线程,isInterrupted返回目标线程的中断状态,interrupted用于清除当前线程的中断状态,这是清除中断状态唯一的方法。中断一般不能与可阻塞的函数一起使用。

静态的interrupted方法应该小心使用,它会清除并发线程的中断状态,如果返回了true,必须进行处理,如果想掩盖这个中断,可以抛出 InterruptedException 异常或者再次调用interrupt来保存中断状态。

调用interrupt并不意味着必须停止目标线程正在进行的工作,它仅仅传递了请求中断的信息,意味着完成当前任务,保证数据结构的统一,然后在下一周期结束。有一些方法对这样的请求很重视,比如wait,sleep(阻塞方式),当它们接到中断请求时会抛出一个异常,或者进入时中断状态就已经被设置了。所以这两个方法尽量不要用。

下例中在两处用到了检测中断技术,因为put是个阻塞操作,所以在之前检测总比在之后检测性能更好。前提是此时没有消费者线程或是put是个很耗时的操作,像这种对中断状态进行显式的检测会对调用可中断的阻塞方法时很有用外,因为通常我们不能得到期望的响应。

public class PrimeProducer extends Thread 
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue)
this.queue = queue;

//这个例子可以很好的处理阻塞操作的中断问题,通过中断。
public void run()
try
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
catch (InterruptedException consumed)
/* Allow thread to exit */



public void cancel()
interrupt();

中断策略

正如需要为任务制定取消策略一样,也应该定制线程中断策略。一个中断策略决定线程如何应对中断请求--当发现中断请求时,它会做什么。

区分任务和线程对中断的反应是很重要的,任务不会在自己拥有的线程中执行,它们借用属于服务的线程,比如线程池,如果代码并不是线程的所有者就应该小心地保存中断状态(如果你给主人打扫房间,主人不在的这段时间你不能把收到的邮件全丢掉,应该收起来待主人回来再处理)。这就是为什么大多数可阻塞的库函数,仅仅抛出InterruptedException做为中断的响应,这也是最合理的策略。

在理中断时应该保存中断状态,也不能简单的是把InterruptedException传递给调用者,应该在它之后恢复中断的状态:Thread.currentThread().interrupt();当检查到中断请求时,任务不需要放弃所有的事情,可以选择推迟直到更合适的时机。这需要记得它已经被请求过中断了,完成当前正在进行的任务,再抛出中断异常或指明中断,这种技术可以保证数据结构不被破坏。

响应中断

有两种处理InterruptedException的实用策略:传递异常和恢复中断状态,使你的方法也成为可中断的阻塞方法,或者保存中断状态,上层调用者代码能对其进行处理。

如果不想或不能传递 InterruptedException 异常,需要另一种方式保存中断请求,因为大多数代码并不知道它们在哪个线程中运行,并再次调用interrupt来恢复中断状态,而不应该掩盖 InterruptedException 异常,如果你的代码没有相应的处理程序,就不应该在catch中捕获这个异常。过早设置中断可能会引起无限循环。

不可取消的任务在任务退出前保存中断

public Task getNextTask(BlockingQueue<Task> queue) 
boolean interrupted = false;
try
while (true)
try
return queue.take();
catch (InterruptedException e)
interrupted = true;
// fall through and retry


finally
if (interrupted)
Thread.currentThread().interrupt();

在中断线程之前,应该了解它的中断策略,且不要在外部线程中安排中断。如果要在一个专门的线程中中断任务,这里用到了jion方法,这个方法有不足之处,它如果传一个超时参数,无法确定是由于异常还是因为超时退出的状态。

public class TimedRun2 
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);

public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException
class RethrowableTask implements Runnable
private volatile Throwable t;

public void run()
try
r.run();
catch (Throwable t)
this.t = t;



void rethrow()
if (t != null)
throw launderThrowable(t);



RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable()
public void run()
taskThread.interrupt();

, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();

通过Future取消

Future有一个cancel方法,它需要一个boolean参数,它的返回值表示取消尝试是否成功(这仅仅告诉你它是否能接收中断,而不是任务是否检测并处理了中断)。

如果为true并且任务正在一线程中运行,那么这个线程是应该中断的。如果是false并且任务还没启动的话,那这个任务永远不会启动了。除非知道线程的中断策略,否则不应该中断线程,这个例子中cancel何时设置true和false需要考虑。

但任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,这时cancel是安全的。通过Future来中断任务并不影响线程池中其它的线程。在一个专门的线程中中断任务。通过Future来取消任务。

public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException
Future<?> task = taskExec.submit(r);
try
task.get(timeout, unit);
catch (TimeoutException e)
// 下面任务会被取消
catch (ExecutionException e)
// task中抛出的异常,重抛出
throw launderThrowable(e.getCause());
finally
// 如果任务已经取消,是无害的
task.cancel(true); // interrupt if running,如果为false表示如果还没有启动的话,不要运行这个任务,用于那些不处理中断的任务。

处理不可中断阻塞

很多可阻塞的库方法通过提前返回和抛出InterruptedException来实现对中断的响应,这使得构建可以响应取消的任务更加容易。但有些阻塞方法或阻塞机制并不响应中断。但可以通过与中断类似的手段,来确保可以停止这些线程,前提是我们需要清楚地知道线程为什么会阻塞。

下例展现了一项用来封装非标准取消的技术,为了方便终止一个用户的连接或关闭服务器。重写了interrupt方法。

public class ReaderThread extends Thread 
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;

public ReaderThread(Socket socket) throws IOException
this.socket = socket;
this.in = socket.getInputStream();


public void interrupt()
try
socket.close();
catch (IOException ignored)
finally
super.interrupt();



public void run()
try
byte[] buf = new byte[BUFSZ];
while (true)
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);

catch (IOException e) /* Allow thread to exit */



public void processBuffer(byte[] buf, int count)

用newTaskFor钩子方法封装非标准取消

在上个例子中,可以使用newTaskFor钩子函数来改进用来封装非标准取消的方法。这是java 6 中添加到ThreadPoolExecutor的新特性,当提交一个Callable给ExecutorService时,submit返回一个Future,可以用Future来取消任务。newTaskFor钩子是一个工厂方法,创建Future来代表任务,它返回一个RunnableFuture接口,它扩展了Future和Runnable由FutureTask来实现。

自定义的任务Future可以重写canel方法,实现日志或收集取消的统计信息。也可以通过重写Thread.interrupt()实现上面的非标准取消功能。

public interface CancellableTask<T> extends Callable<T> 
void cancel();
RunnableFuture<T> newTask();


public class CancellingExecutor extends ThreadPoolExecutor
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable)
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);



public abstract class SocketUsingTask <T> implements CancellableTask<T>
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s)
socket = s;

public synchronized void cancel()
try
if (socket != null)
socket.close();
catch (IOException ignored)


public RunnableFuture<T> newTask()
return new FutureTask<T>(this)
public boolean cancel(boolean mayInterruptIfRunning)
try
SocketUsingTask.this.cancel();
finally
return super.cancel(mayInterruptIfRunning);


;

如果SocketUsingTask通过自身Future被取消,执行线程会被中断,这提高了任务对取消的响应性,这样做在保证响应取消的同时,不仅可以安全地调用可中断方法,还可以调用阻塞中的Socket I/O方法。

二、停止基于线程的服务

由于java不提供退出线程惯用的方法,所以需要自行编码结束。实践指出,我们不应该操控某个线程--中断它、改变它的优先级等等,除非你拥有这个线程。 线程通过一个Thread对象表示,线程的所有权也是不能被传递的,但线程可以被自由的共享。

一般的应用程序会有三个部分组成:应用程序拥有服务,服务拥有工作线程,但应用程序并不拥有工作线程,因此应用程序如果想控制线程只能通过服务来处理。就像线程池拥有工作者线程一样。服务比如ExecutorService应该提供生命周期方法来关闭它自己并关闭它拥有的线程。

//这是一个多生产者,单消费者设计,日志信息通过BlockingQueue移交给日志线程
public class LogService
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;

public LogService(Writer writer)
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);


public void start()
loggerThread.start();


public void stop()
synchronized (this)
isShutdown = true;

loggerThread.interrupt();


public void log(String msg) throws InterruptedException
synchronized (this)
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;

queue.put(msg);

//内部类
private class LoggerThread extends Thread
public void run()
try
while (true)
try
synchronized (LogService.this)
if (isShutdown && reservations == 0)
break;

String msg = queue.take();
synchronized (LogService.this)
--reservations;

writer.println(msg);
catch (InterruptedException e) /* retry */


finally
writer.close();



另一种更高级的方法。复杂的程序可能把会ExecutorService封装在一个更高层级的服务中,通过增加链接,把所有权链从应用程序扩展到服务,再到线程,每一个链上的成员管理它所拥有的服务或线程的生命周期。

public class LogService1 
private final ExecutorService exec =
public void start()
public void stop() throws InterruptedException
exec.shutdown();
exec.awaitTermination(timeout, unit);
writer.close();
//LogService委托给ExecutorService执行,LogService管理自己的生命周期
public void log(String msg)
try
exec.execute(new WriteTask(msg));
catch(Exception e)


致命药丸

另一种保证生产--消费服务关闭的方式是使用poison pill:一个可识别的对象,置于队列中,意味着“当你得到它时或得到一定数量时,停止一切工作”。这种方式适合在生产--消费数量已知的情况下使用。不过在生产--消费者数量较大时很难处理,致命药丸只在无限队列中工作时,才是可靠的。

//生产者线程
class CrawlerThread extends Thread
public void run()
try
crawl(root);
catch (InterruptedException e) /* fall through */
finally
while (true)
try
queue.put(POISON);
break;
catch (InterruptedException e1) /* retry */





private void crawl(File root) throws InterruptedException
File[] entries = root.listFiles(fileFilter);
if (entries != null)
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);



//消费者线程
class IndexerThread extends Thread
public void run()
try
while (true)
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);

catch (InterruptedException consumed)



public void indexFile(File file)
/*...*/
;
public class IndexingService 
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;

public IndexingService(File root, final FileFilter fileFilter)
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter()
public boolean accept(File f)
return f.isDirectory() || fileFilter.accept(f);

;


private boolean alreadyIndexed(File f)
return false;


public void start()
producer.start();
consumer.start();


public void stop()
producer.interrupt();


public void awaitTermination() throws InterruptedException
consumer.join();

只执行一次的服务

如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么可以通过私有的Executor来简化服务的生命周期管理,其中Executor的寿命限定在该方法中(通常会用到invokeAll和invokeAny方法):向每个主机提交任务,在这之后,当所有检查邮件的任务完成后,会关闭Executor,并等待结束。

public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException
ExecutorService exec = Executors.newCachedThreadPool();
//为了从内部Runnable访问hasNewMail标志,它必须是final类型的,才能避免被更改
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try
for (final String host : hosts)
exec.execute(new Runnable()
public void run()
if (checkMail(host))
hasNewMail.set(true);

);
finally
exec.shutdown();
exec.awaitTermination(timeout, unit);

return hasNewMail.get();

TrackingExecutor任务跟踪

但这个方法会强制中断正在运行的任务,也就是无法区分哪些任务正在执行中,哪些执行完了,必须自己通过设置检查点来区分,如果不处理可能会造成数据的不一致性。

但可以通过扩展AbstractExecutorService来区分取消和中止的任务。 如TrackingExecutor可以识别那些已经开始,但没有正常结束的任务。任务必须在返回时保存线程的中断状态。TrackingExecutorService例子说明了为后续执行来保存未完成的任务。

public class TrackingExecutor extends AbstractExecutorService 
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService exec)
this.exec = exec;

//返回被取消(已经开始,但没有正常结束)的任务清单
public List<Runnable> getCancelledTasks()
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);


public void execute(final Runnable runnable)
exec.execute(new Runnable()
public void run()
try
runnable.run();
finally
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);


);

public abstract class WebCrawler 
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();

private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;

public WebCrawler(URL startUrl)
urlsToCrawl.add(startUrl);

public synchronized void start()
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();


public synchronized void stop() throws InterruptedException
try
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
finally
exec = null;



protected abstract List<URL> processPage(URL url);

private void saveUncrawled(List<Runnable> uncrawled)
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());


private void submitCrawlTask(URL u)
exec.execute(new CrawlTask(u));


private class CrawlTask implements Runnable
private final URL url;

CrawlTask(URL url)
this.url = url;


private int count = 1;

boolean alreadyCrawled()
return seen.putIfAbsent(url, true) != null;


void markUncrawled()
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);


public void run()
for (URL link : processPage(url))
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);



public URL getPage()
return url;


三、处理反常的线程终止

导致线程dead的主要原因是RuntimeException,因为这种异常错误是不可修复的。

下面的例子阐述了如何在线程池内部构建一个工作者线程,如果任务抛出了一个未检查的异常,它将允许线程终结,但是会首先通知框架,线程已经终结。然后,框架可能会用新的线程取代这个工作线程,也可能不这么做,因为线程池也许正在关闭,抑或当前已有足够多的线程,能够满足需要了。ThreadPoolExecutor和Swing使用这项技术来确保那些不能正常运转的任务不会影响到后续任务的执行。

需查异常的处理

典型线程池的工作者线程的构建

public void run()
Throwable thrown = null;
try
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
catch(Throwable e)
thrown = e;
finally
threadExited(this, thrown);

不需查异常的处理

上面讲到了不需查异常的处理,线程的API同样提供了UncaughtExceptionHandler工具,便于监测到线程因不需查异常引起的dead。这两个方案互为补充,可以有效防止线程的泄漏问题。

当一个线程因为不需查异常退出时,JVM会把这个事件报告给应用程序自定义的Handler。如果handler不存在,默认会用System.err打印信息。至于hander如何处理取决于应

以上是关于Thread专题 - 取消和关闭的主要内容,如果未能解决你的问题,请参考以下文章

并发编程专题-线程的创建方式

并发编程专题-线程的创建方式

JVM技术专题Thread的stackSize与-Xss参数的区别「分析篇」

第十七届智能汽车竞赛RT-Thread | 逐飞联合直播 --群体挑战赛专题

Java专题十六:定时任务

Java面试手册:线程专题 ③