netty源码之关闭服务
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之关闭服务相关的知识,希望对你有一定的参考价值。
目录
1、进入workerGroup.shutdownGracefully()
一、前言
这里的关闭包含了boss/worker两个线程,是netty提供的优雅退出。
二、客户端的关闭服务
1、进入workerGroup.shutdownGracefully()
不断跟进源码,进入到了SingleThreadEventExecutor的shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)方法中
//注意这里的关闭是设置状态位 @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod"); if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } ObjectUtil.checkNotNull(unit, "unit"); if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED://如果原来是启动的状态 newState = ST_SHUTTING_DOWN;//设置为正在关闭状态 break; default: newState = oldState; wakeup = false; } } // CAS更改状态 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (ensureThreadStarted(oldState)) { return terminationFuture; } if (wakeup) { taskQueue.offer(WAKEUP_TASK); if (!addTaskWakesUp) { wakeup(inEventLoop); } } return terminationFuture(); }
2、NioEventLoop.run() 关闭的入口
3、closeAll()
4、confirmShutdown()方法
跟踪源码,进入SingleThreadEventExecutor
的confirmShutdown()
方法
protected boolean confirmShutdown() { if (!isShuttingDown()) { return false; } if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } // 取消所有的定时任务 cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { // 计算关闭的任务开始的时间 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } // 如果有task/hook则不关闭, if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { // Executor shut down - no new tasks anymore. return true; } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or // terminate if the quiet period is 0. // See https://github.com/netty/netty/issues/4241 if (gracefulShutdownQuietPeriod == 0) { return true; } taskQueue.offer(WAKEUP_TASK); return false; } final long nanoTime = ScheduledFutureTask.nanoTime(); // 比较是否超过了可以容忍的关闭的最大时间 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } // 关闭任务的静默期内,通过lastExecutionTime判断是否有任务执行 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100);// 如果有任务就sleep,让其不关闭 } catch (InterruptedException e) { // Ignore } return false; } // 静默期内没有任务执行,马上关闭 // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true; }
5、cleanup() 关闭selector
最后还有一件很重要的事,SingleThreadEventExecutor
开启线程去执行人的时候会在这个方法里面阻塞,里面有很多空循环的for,除非shundown才能break 到最后会执行cleanup();
来实现对selector的关闭。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. At this point the event loop // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for // graceful shutdown with quietPeriod. for (;;) { if (confirmShutdown()) { break; } } // Now we want to make sure no more tasks can be added from this point. This is // achieved by switching the state. Any new tasks beyond this point will be rejected. for (;;) { int oldState = state; if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { break; } } // We have the final set of tasks in the queue now, no more can be added, run all remaining. // No need to loop here, this is the final pass. confirmShutdown(); } finally { // 关闭selector try { cleanup(); } finally { // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify // the future. The user may block on the future and once it unblocks the JVM may terminate // and start unloading classes. // See https://github.com/netty/netty/issues/6596. FastThreadLocal.removeAll(); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); int numUserTasks = drainTasks(); if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')'); } terminationFuture.setSuccess(null); } } } } }); }
以上是关于netty源码之关闭服务的主要内容,如果未能解决你的问题,请参考以下文章