Netty源码分析-SingleThreadEventExecutor

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-SingleThreadEventExecutor相关的知识,希望对你有一定的参考价值。

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * Abstract base class for @link OrderedEventExecutor's that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 

    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);

    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

    private static final Runnable NOOP_TASK = new Runnable() 
        @Override
        public void run() 
            // Do nothing.
        
    ;

    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");

    //普通任务队列,java阻塞对象
    private final Queue<Runnable> taskQueue;

    //工作线程
    private volatile Thread thread;

    //线程属性,优先级,守护线程,名称,id啥的。
    @SuppressWarnings("unused")
    private volatile ThreadProperties threadProperties;

    //每个任务创建一个线程去执行
    // class ThreadPerTaskExecutor implements Executor 
    //     public void execute(Runnable r) 
    //         new Thread(r).start();
    //     
    // 
    private final Executor executor;

    //线程中断状态
    private volatile boolean interrupted;

    //信号锁
    private final CountDownLatch threadLock = new CountDownLatch(1);

    //存储关闭时的任务集
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
    private final boolean addTaskWakesUp;

    //队列容量
    private final int maxPendingTasks;

    //队列溢出后的处理方式-默认throw new RejectedExecutionException();
    private final RejectedExecutionHandler rejectedExecutionHandler;

    //最后一次执行任务的
    private long lastExecutionTime;

    @SuppressWarnings( "FieldMayBeFinal", "unused" )
    //当前状态-默认未启动
    private volatile int state = ST_NOT_STARTED;

    private volatile long gracefulShutdownQuietPeriod;
    private volatile long gracefulShutdownTimeout;
    private long gracefulShutdownStartTime;

    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

    /**
     * Create a new instance
     *
     * @param parent            the @link EventExecutorGroup which is the parent of this instance and belongs to it
     * @param threadFactory     the @link ThreadFactory which will be used for the used @link Thread
     * @param addTaskWakesUp    @code true if and only if invocation of @link #addTask(Runnable) will wake up the
     *                          executor thread
     */
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) 
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
    

    /**
     * Create a new instance
     *
     * @param parent            the @link EventExecutorGroup which is the parent of this instance and belongs to it
     * @param threadFactory     the @link ThreadFactory which will be used for the used @link Thread
     * @param addTaskWakesUp    @code true if and only if invocation of @link #addTask(Runnable) will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the @link RejectedExecutionHandler to use.
     */
    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory,
            boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) 
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
    

    /**
     * Create a new instance
     *
     * @param parent            the @link EventExecutorGroup which is the parent of this instance and belongs to it
     * @param executor          the @link Executor which will be used for executing
     * @param addTaskWakesUp    @code true if and only if invocation of @link #addTask(Runnable) will wake up the
     *                          executor thread
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) 
        this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
    

    /**
     * Create a new instance
     *
     * @param parent            the @link EventExecutorGroup which is the parent of this instance and belongs to it
     * @param executor          the @link Executor which will be used for executing
     * @param addTaskWakesUp    @code true if and only if invocation of @link #addTask(Runnable) will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the @link RejectedExecutionHandler to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) 
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ThreadExecutorMap.apply(executor, this);
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) 
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    

    /**
     * @deprecated Please use and override @link #newTaskQueue(int).
     */
    @Deprecated
    protected Queue<Runnable> newTaskQueue() 
        return newTaskQueue(maxPendingTasks);
    

    /**
     * Create a new @link Queue which will holds the tasks to execute. This default implementation will return a
     * @link LinkedBlockingQueue but if your sub-class of @link SingleThreadEventExecutor will not do any blocking
     * calls on the this @link Queue it may make sense to @code @Override this and return some more performant
     * implementation that does not support blocking operations at all.
     */
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) 
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    

    /**
     * Interrupt the current running @link Thread.
     */
    //如果线程为空,则设值标记变量
    protected void interruptThread() 
        Thread currentThread = thread;
        if (currentThread == null) 
            interrupted = true;
         else 
            currentThread.interrupt();
        
    

    /**
     * @see Queue#poll()
     */
    //从普通任务队列当中获取task
    protected Runnable pollTask() 
        assert inEventLoop();
        return pollTaskFrom(taskQueue);
    

    //循环获取,忽略WAKEUP_TASK
    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) 
        for (;;) 
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) 
                return task;
            
        
    

    /**
     * Take the next @link Runnable from the task queue and so will block if no task is currently present.
     * <p>
     * Be aware that this method will throw an @link UnsupportedOperationException if the task queue, which was
     * created via @link #newTaskQueue(), does not implement @link BlockingQueue.
     * </p>
     *
     * @return @code null if the executor thread has been interrupted or waken up.
     */
    protected Runnable takeTask() 
        assert inEventLoop();
        if (!(taskQueue instanceof BlockingQueue)) 
            throw new UnsupportedOperationException();
        

        BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
        for (;;) 
            //先从优先级队列中获取一个task
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            //如果优先级队列为空,则直接在普通队列上take,take方法会阻塞
            if (scheduledTask == null) 
                Runnable task = null;
                try 
                    //阻塞获取task
                    task = taskQueue.take();
                    if (task == WAKEUP_TASK) 
                        task = null;
                    
                 catch (InterruptedException e) 
                    // Ignore
                
                //如果不为空则返回任务
                return task;
             else 
                //如果优先级队列内部存在task,则获取它的到期执行时间
                long delayNanos = scheduledTask.delayNanos();
                Runnable task = null;
                if (delayNanos > 0) 
                    try 
                        //让线程阻塞到有序队列的到期时间,则唤起线程。
                        //在这期间,如果有其它线程往普通队列里加入task,则线程也会唤起。
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                     catch (InterruptedException e) 
                        // Waken up.
                        return null;
                    
                
                if (task == null) 
                    // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                    // scheduled tasks are never executed if there is always one task in the taskQueue.
                    // This is for example true for the read task of OIO Transport
                    // See https://github.com/netty/netty/issues/1614
                    //到达唤起时间,则把有序队列当中的任务全部加入到普通队里
                    fetchFromScheduledTaskQueue();
                    //然后从普通队列中获取任务执行
                    task = taskQueue.poll();
                

                if (task != null) 
                    return task;
                
            
        
    

    private boolean fetchFromScheduledTaskQueue() 
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) 
            return true;
        
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) 
            //从优先级队列中获取到执行期的元素
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) 
                return true;
            
            //把它加入到普通队列
            if (!taskQueue.offer(scheduledTask)) 
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            
        
    

    /**
     * @return @code true if at least one scheduled task was executed.
     */
    private boolean executeExpiredScheduledTasks() 
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) 
            return false;
        
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) 
            return false;
        
        do 
            safeExecute(scheduledTask);
         while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
        return true;
    

    /**
     * @see Queue#peek()
     */
    protected Runnable peekTask() 
        assert inEventLoop();
        return taskQueue.peek();
    

    /**
     * @see Queue#isEmpty()
     */
    //判断是否有普通任务
    protected boolean hasTasks() 
        assert inEventLoop();
        return !taskQueue.isEmpty();
    

    /**
     * Return the number of tasks that are pending for processing.
     *
     * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
     * SingleThreadEventExecutor. So use it with care!</strong>
     */
    public int pendingTasks() 
        return taskQueue.size();
    

    /**
     * Add a task to the task queue, or throws a @link RejectedExecutionException if this instance was shutdown
     * before.
     */
    protected void addTask(Runnable task) 
        ObjectUtil.checkNotNull(task, "task");
        if (!offerTask(task)) 
            reject(task);
        
    

    final boolean offerTask(Runnable task) 
        if (isShutdown()) 
            reject();
        
        return taskQueue.offer(task);
    

    /**
     * @see Queue#remove(Object)
     */
    protected boolean removeTask(Runnable task) 
        return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
    

    /**
     * Poll all tasks from the task queue and run them via @link Runnable#run() method.
     *
     * @return @code true if and only if at least one task was run
     */
    protected boolean runAllTasks() 
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do 
            //把优先级队列里面到期需要执行的任务,加入到普通队列
            fetchedAll = fetchFromScheduledTaskQueue();
            //把普通队列全部执行
            if (runAllTasksFrom(taskQueue)) 
                ranAtLeastOne = true;
            
         while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        //记录最后一次任务结束时间
        if (ranAtLeastOne) 
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        
        //子类可以重写,相当于事件。
        afterRunningAllTasks();
        return ranAtLeastOne;
    

    /**
     * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
     * or @code maxDrainAttempts has been exceeded.
     * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
     *                         continuous task execution and scheduling from preventing the EventExecutor thread to
     *                         make progress and return to the selector mechanism to process inbound I/O events.
     * @return @code true if at least one task was run.
     */
    protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) 
        assert inEventLoop();
        boolean ranAtLeastOneTask;
        int drainAttempt = 0;
        do 
            // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
            // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
            ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
         while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);

        if (drainAttempt > 0) 
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        
        afterRunningAllTasks();

        return drainAttempt > 0;
    

    /**
     * Runs all tasks from the passed @code taskQueue.
     *
     * @param taskQueue To poll and execute all tasks.
     *
     * @return @code true if at least one task was executed.
     */
    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) 
        //先拿出一个
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) 
            return false;
        
        for (;;) 
            //运行任务忽略异常
            safeExecute(task);
            //继续拿。全部执行完毕
            task = pollTaskFrom(taskQueue);
            if (task == null) 
                return true;
            
        
    

    /**
     * What ever tasks are present in @code taskQueue when this method is invoked will be @link Runnable#run().
     * @param taskQueue the task queue to drain.
     * @return @code true if at least @link Runnable#run() was called.
     */
    private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) 
        Runnable task = pollTaskFrom(taskQueue);//先获取一个任务
        if (task == null) 
            return false;
        
        //取最小值
        int remaining = Math.min(maxPendingTasks, taskQueue.size());
        //执行任务
        safeExecute(task);
        // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
        // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
        //运行次数不超过remaining,并且不会忽略WAKEUP_TASK类型的任务
        while (remaining-- > 0 && (task = taskQueue.poll()) != null) 
            safeExecute(task);//执行任务
        
        return true;
    

    /**
     * Poll all tasks from the task queue and run them via @link Runnable#run() method.  This method stops running
     * the tasks in the task queue and returns if it ran longer than @code timeoutNanos.
     */
    protected boolean runAllTasks(long timeoutNanos) 
        //把优先级队列中到期的任务抓取到普通队列
        fetchFromScheduledTaskQueue();

        //获取普通多列当中任务
        Runnable task = pollTask();
        if (task == null) 
            //事件
            afterRunningAllTasks();
            return false;
        

        //设置允许执行的时间
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;


        long runTasks = 0;
        long lastExecutionTime;
        for (;;) 
            //执行任务
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0)  //每64个任务检查一下时间
                //系统当前时间如果超过了允许执行的时间,则结束循环,让EventLoop线程去干其他事情
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) 
                    break;
                
            

            //执行任务
            task = pollTask();
            if (task == null) 
                //如果task为空,记录最后一个任务的执行时间
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            
        
        //事件
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    

    /**
     * Invoked before returning from @link #runAllTasks() and @link #runAllTasks(long).
     */
    @UnstableApi
    protected void afterRunningAllTasks()  

    /**
     * Returns the amount of time left until the scheduled task with the closest dead line is executed.
     */
    protected long delayNanos(long currentTimeNanos) 
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) 
            return SCHEDULE_PURGE_INTERVAL;
        

        return scheduledTask.delayNanos(currentTimeNanos);
    

    /**
     * Returns the absolute point in time (relative to @link #nanoTime()) at which the the next
     * closest scheduled task should run.
     */
    @UnstableApi
    protected long deadlineNanos() 
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) 
            return nanoTime() + SCHEDULE_PURGE_INTERVAL;
        
        return scheduledTask.deadlineNanos();
    

    /**
     * Updates the internal timestamp that tells when a submitted task was executed most recently.
     * @link #runAllTasks() and @link #runAllTasks(long) updates this timestamp automatically, and thus there's
     * usually no need to call this method.  However, if you take the tasks manually using @link #takeTask() or
     * @link #pollTask(), you have to call this method at the end of task execution loop for accurate quiet period
     * checks.
     */
    protected void updateLastExecutionTime() 
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    

    /**
     * Run the tasks in the @link #taskQueue
     */
    protected abstract void run();

    /**
     * Do nothing, sub-classes may override
     */
    protected void cleanup() 
        // NOOP
    

    protected void wakeup(boolean inEventLoop) 
        if (!inEventLoop) 
            // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
            // is already something in the queue.
            taskQueue.offer(WAKEUP_TASK);
        
    

    @Override
    public boolean inEventLoop(Thread thread) 
        return thread == this.thread;
    

    /**
     * Add a @link Runnable which will be executed on shutdown of this instance
     */
    //添加关闭时的任务
    public void addShutdownHook(final Runnable task) 
        if (inEventLoop()) 
            //在EventLoop直接添加
            shutdownHooks.add(task);
         else 
            //不在EventLoop创建任务添加
            execute(new Runnable() 
                @Override
                public void run() 
                    shutdownHooks.add(task);
                
            );
        
    

    /**
     * Remove a previous added @link Runnable as a shutdown hook
     */
    //移除关闭任务
    public void removeShutdownHook(final Runnable task) 
        if (inEventLoop()) 
            shutdownHooks.remove(task);
         else 
            execute(new Runnable() 
                @Override
                public void run() 
                    shutdownHooks.remove(task);
                
            );
        
    

    //执行关闭任务
    private boolean runShutdownHooks() 
        boolean ran = false;
        // Note shutdown hooks can add / remove shutdown hooks.
        while (!shutdownHooks.isEmpty()) 
            List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
            shutdownHooks.clear();
            for (Runnable task: copy) 
                try 
                    task.run();
                 catch (Throwable t) 
                    logger.warn("Shutdown hook raised an exception.", t);
                 finally 
                    ran = true;
                
            
        

        if (ran) 
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        

        return ran;
    

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) 
        ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
        if (timeout < quietPeriod) 
            throw new IllegalArgumentException(
                    "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        
        ObjectUtil.checkNotNull(unit, "unit");

        if (isShuttingDown()) 
            return terminationFuture();
        

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) 
            if (isShuttingDown()) 
                return terminationFuture();
            
            int newState;
            wakeup = true;
            oldState = state;
            if (inEventLoop) 
                newState = ST_SHUTTING_DOWN;
             else 
                switch (oldState) 
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                
            
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) 
                break;
            
        
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);

        if (ensureThreadStarted(oldState)) 
            return terminationFuture;
        

        if (wakeup) 
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) 
                wakeup(inEventLoop);
            
        

        return terminationFuture();
    

    @Override
    public Future<?> terminationFuture() 
        return terminationFuture;
    

    @Override
    @Deprecated
    public void shutdown() 
        if (isShutdown()) 
            return;
        

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) 
            if (isShuttingDown()) 
                return;
            
            int newState;
            wakeup = true;

            //当前状态
            oldState = state;
            if (inEventLoop)   //在EventLoop线程中,说明还没关闭,说以直接设置为ST_SHUTDOWN
                //关闭状态
                newState = ST_SHUTDOWN;
             else 
                switch (oldState) 
                    //不在EventLoop线程中

                    //未启动,已经启动,关闭中
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                    case ST_SHUTTING_DOWN:
                        //设置为关闭
                        newState = ST_SHUTDOWN;
                        break;
                    default:
                        //状态不变
                        newState = oldState;
                        wakeup = false;
                
            

            //把状态设置为 newState = ST_SHUTDOWN;
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) 
                break;
            
        

        if (ensureThreadStarted(oldState)) 
            return;
        

        if (wakeup) 
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) 
                wakeup(inEventLoop);
            
        
    

    //是否关闭中
    @Override
    public boolean isShuttingDown() 
        return state >= ST_SHUTTING_DOWN;
    

    //是否已经关闭
    @Override
    public boolean isShutdown() 
        return state >= ST_SHUTDOWN;
    

    //是否已经终止
    @Override
    public boolean isTerminated() 
        return state == ST_TERMINATED;
    

    /**
     * Confirm that the shutdown if the instance should be done now!
     */
    //判断是否关闭
    protected boolean confirmShutdown() 
        if (!isShuttingDown()) 
            return false;//没关闭
        

        //必须在EventLoop线程
        if (!inEventLoop()) 
            throw new IllegalStateException("must be invoked from an event loop");
        

        //把优先级队列全部清空
        cancelScheduledTasks();

        if (gracefulShutdownStartTime == 0) 
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        

        //把任务全部执行完毕
        if (runAllTasks() || runShutdownHooks()) 
            if (isShutdown()) 
                // Executor shut down - no new tasks anymore.
                //已经关闭,没有任何新的任务
                return true;
            

            // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
            // terminate if the quiet period is 0.
            // See https://github.com/netty/netty/issues/4241
            if (gracefulShutdownQuietPeriod == 0) 
                return true;
            
            taskQueue.offer(WAKEUP_TASK);
            return false;
        

        final long nanoTime = ScheduledFutureTask.nanoTime();

        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) 
            return true;
        

        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) 
            // Check if any tasks were added to the queue every 100ms.
            // TODO: Change the behavior of takeTask() so that it returns on timeout.
            taskQueue.offer(WAKEUP_TASK);
            try 
                Thread.sleep(100);
             catch (InterruptedException e) 
                // Ignore
            

            return false;
        

        // No tasks were added for last quiet period - hopefully safe to shut down.
        // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
        return true;
    

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 
        ObjectUtil.checkNotNull(unit, "unit");
        if (inEventLoop()) 
            throw new IllegalStateException("cannot await termination of the current thread");
        

        threadLock.await(timeout, unit);

        return isTerminated();
    

    @Override
    public void execute(Runnable task) 
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    

    @Override
    public void lazyExecute(Runnable task) 
        execute(ObjectUtil.checkNotNull(task, "task"), false);
    

    private void execute(Runnable task, boolean immediate) 
        boolean inEventLoop = inEventLoop();
        //把任务添加到队列
        addTask(task);
        //不在EventLoop线程
        if (!inEventLoop) 
            //尝试启动工作线程,启动成功的话就会死循环不断地执行任务
            startThread();
            //如果startThread方法返回,一种情况是已经启动,或者是关闭状态
            if (isShutdown()) 
                //已经关闭
                boolean reject = false;
                try 
                    //删除这个任务
                    if (removeTask(task)) 
                        reject = true;
                    
                 catch (UnsupportedOperationException e) 
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                
                //如果删除成功,抛出拒绝异常
                if (reject) 
                    reject();
                
            
        

        //决定是否唤起线程
        if (!addTaskWakesUp && immediate) 
            wakeup(inEventLoop);
        
    

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException 
        throwIfInEventLoop("invokeAny");
        return super.invokeAny(tasks);
    

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException 
        throwIfInEventLoop("invokeAny");
        return super.invokeAny(tasks, timeout, unit);
    

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException 
        throwIfInEventLoop("invokeAll");
        return super.invokeAll(tasks);
    

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException 
        throwIfInEventLoop("invokeAll");
        return super.invokeAll(tasks, timeout, unit);
    

    private void throwIfInEventLoop(String method) 
        if (inEventLoop()) 
            throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
        
    

    /**
     * Returns the @link ThreadProperties of the @link Thread that powers the @link SingleThreadEventExecutor.
     * If the @link SingleThreadEventExecutor is not started yet, this operation will start it and block until
     * it is fully started.
     */
    public final ThreadProperties threadProperties() 
        ThreadProperties threadProperties = this.threadProperties;
        if (threadProperties == null) 
            Thread thread = this.thread;
            if (thread == null) 
                assert !inEventLoop();
                submit(NOOP_TASK).syncUninterruptibly();
                thread = this.thread;
                assert thread != null;
            

            threadProperties = new DefaultThreadProperties(thread);
            if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) 
                threadProperties = this.threadProperties;
            
        

        return threadProperties;
    

    /**
     * @deprecated use @link AbstractEventExecutor.LazyRunnable
     */
    @Deprecated
    protected interface NonWakeupRunnable extends LazyRunnable  

    /**
     * Can be overridden to control which tasks require waking the @link EventExecutor thread
     * if it is waiting so that they can be run immediately.
     */
    protected boolean wakesUpForTask(Runnable task) 
        return true;
    

    protected static void reject() 
        throw new RejectedExecutionException("event executor terminated");
    

    /**
     * Offers the task to the associated @link RejectedExecutionHandler.
     *
     * @param task to reject.
     */
    protected final void reject(Runnable task) 
        rejectedExecutionHandler.rejected(task, this);
    

    // ScheduledExecutorService implementation

    private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);

    //启动线程
    private void startThread() 
        //必须是未启动状态,如果关闭中,或者关闭了都不会再次启动
        if (state == ST_NOT_STARTED) 
            //把状态由ST_NOT_STARTED设置为ST_STARTED,原子方式更新,只会有一个线程操作成功
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) 
                boolean success = false;
                try 
                    //启动线程
                    doStartThread();
                    success = true;
                 finally 
                    //如果启动失败,则把状态还原成ST_NOT_STARTED
                    if (!success) 
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    
                
            
        
    

    private boolean ensureThreadStarted(int oldState) 
        //如果线程未启动
        if (oldState == ST_NOT_STARTED) 
            try 
                //则启动一次,把里面任务运行干净
                doStartThread();
             catch (Throwable cause) 
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);

                if (!(cause instanceof Exception)) 
                    // Also rethrow as it may be an OOME for example
                    PlatformDependent.throwException(cause);
                
                return true;
            
        
        return false;
    

    private void doStartThread() 
        assert thread == null;
        //启动的动作封装成一个任务,executor会启动一个新的线程去执行
        executor.execute(new Runnable() 
            @Override
            public void run() 
                //现在中断状态
                thread = Thread.currentThread();
                if (interrupted) 
                    thread.interrupt();
                

                //更新最后执行时间
                boolean success = false;
                updateLastExecutionTime();
                try 
                    //需要子类实现具体逻辑
                    SingleThreadEventExecutor.this.run();
                    success = true;
                 catch (Throwable t) 
                    logger.warn("Unexpected exception from an event executor: ", t);
                 finally 
                    for (;;) 
                        int oldState = state;
                        //如果run方法结束,才会执行这里,等待状态变为》=ST_SHUTTING_DOWN时结束方法
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) 
                            break;
                        
                    

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) 
                        if (logger.isErrorEnabled()) 
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        
                    

                    try 
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        for (;;) 
                            //等待所有任务执行完毕,所有结束钩子任务执行完毕
                            if (confirmShutdown()) 
                                break;
                            
                        

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        //设置为结束状态
                        for (;;) 
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) 
                                break;
                            
                        

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                     finally 
                        try 
                            //释放资源
                            cleanup();
                         finally 
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) 
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            
                            terminationFuture.setSuccess(null);
                        
                    
                
            
        );
    

    final int drainTasks() 
        int numTasks = 0;
        for (;;) 
            Runnable runnable = taskQueue.poll();
            if (runnable == null) 
                break;
            
            // WAKEUP_TASK should be just discarded as these are added internally.
            // The important bit is that we not have any user tasks left.
            if (WAKEUP_TASK != runnable) 
                numTasks++;
            
        
        return numTasks;
    

    private static final class DefaultThreadProperties implements ThreadProperties 
        private final Thread t;

        DefaultThreadProperties(Thread t) 
            this.t = t;
        

        @Override
        public State state() 
            return t.getState();
        

        @Override
        public int priority() 
            return t.getPriority();
        

        @Override
        public boolean isInterrupted() 
            return t.isInterrupted();
        

        @Override
        public boolean isDaemon() 
            return t.isDaemon();
        

        @Override
        public String name() 
            return t.getName();
        

        @Override
        public long id() 
            return t.getId();
        

        @Override
        public StackTraceElement[] stackTrace() 
            return t.getStackTrace();
        

        @Override
        public boolean isAlive() 
            return t.isAlive();
        
    

 

以上是关于Netty源码分析-SingleThreadEventExecutor的主要内容,如果未能解决你的问题,请参考以下文章

源码分析Netty4专栏

源码分析Netty4专栏

Netty-源码分析LineBasedFrameDecoder

Netty源码分析:read

Netty源码分析:read

[Netty源码分析]ByteBuf(一)