netty源码之关闭服务
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之关闭服务相关的知识,希望对你有一定的参考价值。
目录
1、进入workerGroup.shutdownGracefully()
一、前言
这里的关闭包含了boss/worker两个线程,是netty提供的优雅退出。
二、客户端的关闭服务
1、进入workerGroup.shutdownGracefully()
不断跟进源码,进入到了SingleThreadEventExecutor的shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)方法中
//注意这里的关闭是设置状态位 @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod"); if (timeout < quietPeriod) throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); ObjectUtil.checkNotNull(unit, "unit"); if (isShuttingDown()) return terminationFuture(); boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) if (isShuttingDown()) return terminationFuture(); int newState; wakeup = true; oldState = state; if (inEventLoop) newState = ST_SHUTTING_DOWN; else switch (oldState) case ST_NOT_STARTED: case ST_STARTED://如果原来是启动的状态 newState = ST_SHUTTING_DOWN;//设置为正在关闭状态 break; default: newState = oldState; wakeup = false; // CAS更改状态 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) break; gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (ensureThreadStarted(oldState)) return terminationFuture; if (wakeup) taskQueue.offer(WAKEUP_TASK); if (!addTaskWakesUp) wakeup(inEventLoop); return terminationFuture();
2、NioEventLoop.run() 关闭的入口
3、closeAll()
4、confirmShutdown()方法
跟踪源码,进入SingleThreadEventExecutor
的confirmShutdown()
方法
protected boolean confirmShutdown() if (!isShuttingDown()) return false; if (!inEventLoop()) throw new IllegalStateException("must be invoked from an event loop"); // 取消所有的定时任务 cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) // 计算关闭的任务开始的时间 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); // 如果有task/hook则不关闭, if (runAllTasks() || runShutdownHooks()) if (isShutdown()) // Executor shut down - no new tasks anymore. return true; // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or // terminate if the quiet period is 0. // See https://github.com/netty/netty/issues/4241 if (gracefulShutdownQuietPeriod == 0) return true; taskQueue.offer(WAKEUP_TASK); return false; final long nanoTime = ScheduledFutureTask.nanoTime(); // 比较是否超过了可以容忍的关闭的最大时间 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) return true; // 关闭任务的静默期内,通过lastExecutionTime判断是否有任务执行 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. taskQueue.offer(WAKEUP_TASK); try Thread.sleep(100);// 如果有任务就sleep,让其不关闭 catch (InterruptedException e) // Ignore return false; // 静默期内没有任务执行,马上关闭 // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true;
5、cleanup() 关闭selector
最后还有一件很重要的事,SingleThreadEventExecutor
开启线程去执行人的时候会在这个方法里面阻塞,里面有很多空循环的for,除非shundown才能break 到最后会执行cleanup();
来实现对selector的关闭。
private void doStartThread() assert thread == null; executor.execute(new Runnable() @Override public void run() thread = Thread.currentThread(); if (interrupted) thread.interrupt(); boolean success = false; updateLastExecutionTime(); try SingleThreadEventExecutor.this.run(); success = true; catch (Throwable t) logger.warn("Unexpected exception from an event executor: ", t); finally for (;;) int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) break; // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) if (logger.isErrorEnabled()) logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); try // Run all remaining tasks and shutdown hooks. At this point the event loop // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for // graceful shutdown with quietPeriod. for (;;) if (confirmShutdown()) break; // Now we want to make sure no more tasks can be added from this point. This is // achieved by switching the state. Any new tasks beyond this point will be rejected. for (;;) int oldState = state; if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) break; // We have the final set of tasks in the queue now, no more can be added, run all remaining. // No need to loop here, this is the final pass. confirmShutdown(); finally // 关闭selector try cleanup(); finally // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify // the future. The user may block on the future and once it unblocks the JVM may terminate // and start unloading classes. // See https://github.com/netty/netty/issues/6596. FastThreadLocal.removeAll(); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); int numUserTasks = drainTasks(); if (numUserTasks > 0 && logger.isWarnEnabled()) logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')'); terminationFuture.setSuccess(null); );
以上是关于netty源码之关闭服务的主要内容,如果未能解决你的问题,请参考以下文章