Tomcat监听并接受Socket连接流程

Posted 算法技术猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tomcat监听并接受Socket连接流程相关的知识,希望对你有一定的参考价值。

tomcat建立Socket监听TCP连接,获取连接后委托SocketProcessor处理器处理后续流程

流程:

1、启动tomcat 2、启动tomcat完成后调用NioEndpoint的startInternal()方法初始化操作 3、createExecutor():初始化创建10个工作线程的线程池,此线程池最大只能创建200个线程,最小空闲工作线程为10 4、设置tomcat并发最大连接数为10000 5、创建并运行2个Poller守护线程,其作用是将Acceptor接收的连接请求委托给SocketProcessor处理器 6、startAcceptorThreads():创建一个Acceptor线程对象,用于监听网络TCP/IP连接,并将Acceptor线程对象作为一个守护线程启动,启动线程后就一直做死循环监听TCP连接 7、如果当前连接数大于10000,则等待其他连接释放,如果当前连接数小于10000,则继续后续操作 8、socket = serverSock.accept()等待TCP网络请求进来 9、socket = serverSock.accept();有返回值时,说明有一个TCP连接进来,然后执行setSocketOptions(),将当前连接注册到Poller中 10、getPoller0().register(channel);由于Poller中的两个线程一直在运行,所以当channel注册到Poller中时,Poller中的selector.select()就会得到当前连接 11、Poller.run()方法中,当selector.select()有返回值时,循环遍历每个值SelectionKey,然后调用processKey(SelectionKey,attachment) 12、processKey()方法中调用processSocket()方法,创建一个SocketProcessorBase线程对象处理器来处理连接请求 13、从工作线程池中获取一个工作线程来运行SocketProcessorBase线程对象,其具体实现在NioEndpoint的内部类SocketProcessor.doRun()

NioEndpoint源码分析:

tomcat容器启动成功后,会开启一个NIO端点,创建含有10个工作线程(http-nio-8080-exec-xxx)的线程池,最大并发连接数为10000,然后创建大小为2的Poller线程对象数组,并启动数组中的2个守护线程,然后启动一个Acceptor线程监听网络TCP/IP的连接请求,监听到请求后,注册到Poller线程对象中,Poller线程对象委托SocketProcessor处理请求 startInternal()源码对应上述流程第1-5步:

 
   
   
 
  1. /**

  2. * Start the NIO endpoint, creating acceptor, poller threads.

  3. */

  4. @Override

  5. public void startInternal() throws Exception {

  6.    if (!running) {

  7.        //启动Acceptor线程时,会判断running和paused 状态

  8.        running = true;

  9.        paused = false;

  10.        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

  11.                socketProperties.getProcessorCache());

  12.        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

  13.                        socketProperties.getEventCache());

  14.        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,

  15.                socketProperties.getBufferPool());

  16.        // Create worker collection

  17.        //创建10个工作线程

  18.        if ( getExecutor() == null ) {

  19.            createExecutor();

  20.        }

  21.        //初始化最大并发连接数为10000

  22.        initializeConnectionLatch();

  23.        // Start poller threads

  24.        //创建有2个守护线程的poller数组,并且启动这2个守护线程

  25.        pollers = new Poller[getPollerThreadCount()];

  26.        for (int i=0; i<pollers.length; i++) {

  27.            pollers[i] = new Poller();

  28.            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);

  29.            pollerThread.setPriority(threadPriority);

  30.            pollerThread.setDaemon(true);

  31.            pollerThread.start();

  32.        }

  33.        //启动Acceptor线程,监听网络TCP/IP的连接请求

  34.        startAcceptorThreads();

  35.    }

  36. }

创建Acceptor线程:startAcceptorThreads() 对应上述流程第6步

 
   
   
 
  1. protected final void startAcceptorThreads() {

  2.    //创建一个线程数为1的Acceptor的数组

  3.    int count = getAcceptorThreadCount();

  4.    acceptors = new Acceptor[count];

  5.    for (int i = 0; i < count; i++) {

  6.        //创建Acceptor监听网络TCP/IP请求,具体实现在NioEndpoint中

  7.        acceptors[i] = createAcceptor();

  8.        String threadName = getName() + "-Acceptor-" + i;

  9.        acceptors[i].setThreadName(threadName);

  10.        Thread t = new Thread(acceptors[i], threadName);

  11.        t.setPriority(getAcceptorThreadPriority());

  12.        t.setDaemon(getDaemon());

  13.        t.start();

  14.    }

  15. }

Acceptor线程启动逻辑:对应上述流程第7-9步

 
   
   
 
  1. /**

  2. * 是一个监听网络TCP/IP连接的后台线程,接收到连接后会交给合适的处理器(SocketProcessor/ConnectionHandler)处理,Acceptor并非直接将连接请求交给SocketProcessor,而是经过了Poller

  3. * The background thread that listens for incoming TCP/IP connections and

  4. * hands them off to an appropriate processor.

  5. */

  6. protected class Acceptor extends AbstractEndpoint.Acceptor {

  7.    @Override

  8.    public void run() {

  9.        int errorDelay = 0;

  10.        // Loop until we receive a shutdown command

  11.        // 直到接受到关闭连接的命令,否则一直循环下去

  12.        while (running) {

  13.            // Loop if endpoint is paused

  14.            //如果处于暂定状态,则一直等到paused为false

  15.            while (paused && running) {

  16.                state = AcceptorState.PAUSED;

  17.                try {

  18.                    Thread.sleep(50);

  19.                } catch (InterruptedException e) {

  20.                    // Ignore

  21.                }

  22.            }

  23.            if (!running) {

  24.                break;

  25.            }

  26.            state = AcceptorState.RUNNING;

  27.            try {

  28.                //if we have reached max connections, wait

  29.                // 如果还没有到达最大连接数,则当前连接数量做加一操作,

  30.                // 如果到达最大连接数则让当前接收线程处于等待状态,直到有连接被释放

  31.                countUpOrAwaitConnection();

  32.                SocketChannel socket = null;

  33.                try {

  34.                    // Accept the next incoming connection from the server

  35.                    // socket

  36.                    //从服务器套接字serverSock接收下一个进入的连接请求,如果没有连接请求,则一直阻塞,直到有连接到来才继续后续的处理

  37.                    socket = serverSock.accept();

  38.                } catch (IOException ioe) {

  39.                    // We didn't get a socket

  40.                    countDownConnection();

  41.                    if (running) {

  42.                        // Introduce delay if necessary

  43.                        errorDelay = handleExceptionWithDelay(errorDelay);

  44.                        // re-throw

  45.                        throw ioe;

  46.                    } else {

  47.                        break;

  48.                    }

  49.                }

  50.                // Successful accept, reset the error delay

  51.                errorDelay = 0;

  52.                // Configure the socket

  53.                if (running && !paused) {

  54.                    // setSocketOptions() will hand the socket off to

  55.                    // an appropriate processor if successful

  56.                    // 将当前连接请求注册到Poller中

  57.                    if (!setSocketOptions(socket)) {

  58.                        //关闭socket,并将连接数-1

  59.                        closeSocket(socket);

  60.                    }

  61.                } else {

  62.                    //关闭socket,并将连接数-1

  63.                    closeSocket(socket);

  64.                }

  65.            } catch (Throwable t) {

  66.                ExceptionUtils.handleThrowable(t);

  67.                log.error(sm.getString("endpoint.accept.fail"), t);

  68.            }

  69.        }

  70.        state = AcceptorState.ENDED;

  71.    }

  72.    private void closeSocket(SocketChannel socket) {

  73.        countDownConnection();

  74.        try {

  75.            socket.socket().close();

  76.        } catch (IOException ioe)  {

  77.            if (log.isDebugEnabled()) {

  78.                log.debug(sm.getString("endpoint.err.close"), ioe);

  79.            }

  80.        }

  81.        try {

  82.            socket.close();

  83.        } catch (IOException ioe) {

  84.            if (log.isDebugEnabled()) {

  85.                log.debug(sm.getString("endpoint.err.close"), ioe);

  86.            }

  87.        }

  88.    }

  89. }

将TCP连接请求注册到Poller线程对象中,对应上述流程第10步

 
   
   
 
  1. /**

  2. * 处理指定的连接套接字

  3. * Process the specified connection.

  4. * @param socket The socket channel

  5. * @return <code>true</code> if the socket was correctly configured

  6. *  and processing may continue, <code>false</code> if the socket needs to be

  7. *  close immediately

  8. */

  9. protected boolean setSocketOptions(SocketChannel socket) {

  10.    // Process the connection

  11.    try {

  12.        //disable blocking, APR style, we are gonna be polling it

  13.        socket.configureBlocking(false);

  14.        Socket sock = socket.socket();

  15.        socketProperties.setProperties(sock);

  16.        NioChannel channel = nioChannels.pop();

  17.        if (channel == null) {

  18.            SocketBufferHandler bufhandler = new SocketBufferHandler(

  19.                    socketProperties.getAppReadBufSize(),

  20.                    socketProperties.getAppWriteBufSize(),

  21.                    socketProperties.getDirectBuffer());

  22.           if (isSSLEnabled()) {

  23.                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);

  24.            } else {

  25.                channel = new NioChannel(socket, bufhandler);

  26.            }

  27.        } else {

  28.            channel.setIOChannel(socket);

  29.            channel.reset();

  30.        }

  31.        //从Poller中获取一个线程来处理新接收到的请求连接,委托其完成相应处理,委托完成后,当前线程继续自己被设定的监听接收委托任务

  32.        getPoller0().register(channel);

  33.    } catch (Throwable t) {

  34.        ExceptionUtils.handleThrowable(t);

  35.        try {

  36.            log.error("",t);

  37.        } catch (Throwable tt) {

  38.            ExceptionUtils.handleThrowable(tt);

  39.        }

  40.        // Tell to close the socket

  41.        return false;

  42.    }

  43.    return true;

  44. }

Poller中定义有一个Selector(选择器),它是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件,一个单独的线程可以管理多个channel,从而管理多个网络连接。Poller线程开启后一直处于运行状态,待Poller中的selector.select()有返回值时,继续处理后续流程,否则一直阻塞,对应上述流程第11-12步

 
   
   
 
  1. @Override

  2. public void run() {

  3.    // Loop until destroy() is called

  4.    while (true) {

  5.        boolean hasEvents = false;

  6.        try {

  7.            if (!close) {

  8.                hasEvents = events();

  9.                //获取selector中的连接数量,只有keyCount大于0时,才处理后续流程,否则一直阻塞

  10.                if (wakeupCounter.getAndSet(-1) > 0) {

  11.                    //if we are here, means we have other stuff to do

  12.                    //do a non blocking select

  13.                    keyCount = selector.selectNow();

  14.                } else {

  15.                    keyCount = selector.select(selectorTimeout);

  16.                }

  17.                wakeupCounter.set(0);

  18.            }

  19.            if (close) {

  20.                events();

  21.                timeout(0, false);

  22.                try {

  23.                    selector.close();

  24.                } catch (IOException ioe) {

  25.                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);

  26.                }

  27.                break;

  28.            }

  29.        } catch (Throwable x) {

  30.            ExceptionUtils.handleThrowable(x);

  31.            log.error("",x);

  32.            continue;

  33.        }

  34.        //either we timed out or we woke up, process events first

  35.        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

  36.        Iterator<SelectionKey> iterator =

  37.            keyCount > 0 ? selector.selectedKeys().iterator() : null;

  38.        // Walk through the collection of ready keys and dispatch

  39.        // any active event.

  40.        while (iterator != null && iterator.hasNext()) {

  41.            SelectionKey sk = iterator.next();

  42.            NiosocketWrapper attachment = (NioSocketWrapper)sk.attachment();

  43.            // Attachment may be null if another thread has called

  44.            // cancelledKey()

  45.            if (attachment == null) {

  46.                iterator.remove();

  47.            } else {

  48.                iterator.remove();

  49.                processKey(sk, attachment);

  50.            }

  51.        }//while

  52.        //process timeouts

  53.        timeout(keyCount,hasEvents);

  54.    }//while

  55.    getStopLatch().countDown();

  56. }

  57. protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {

  58.    try {

  59.        if ( close ) {

  60.            cancelledKey(sk);

  61.        } else if ( sk.isValid() && attachment != null ) {

  62.            if (sk.isReadable() || sk.isWritable() ) {

  63.                if ( attachment.getSendfileData() != null ) {

  64.                    processSendfile(sk,attachment, false);

  65.                } else {

  66.                    unreg(sk, attachment, sk.readyOps());

  67.                    boolean closeSocket = false;

  68.                    // Read goes before write

  69.                    if (sk.isReadable()) {

  70.                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {

  71.                            closeSocket = true;

  72.                        }

  73.                    }

  74.                    if (!closeSocket && sk.isWritable()) {

  75.                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {

  76.                            closeSocket = true;

  77.                        }

  78.                    }

  79.                    if (closeSocket) {

  80.                        cancelledKey(sk);

  81.                    }

  82.                }

  83.            }

  84.        } else {

  85.            //invalid key

  86.            cancelledKey(sk);

  87.        }

  88.    } catch ( CancelledKeyException ckx ) {

  89.        cancelledKey(sk);

  90.    } catch (Throwable t) {

  91.        ExceptionUtils.handleThrowable(t);

  92.        log.error("",t);

  93.    }

  94. }

创建SocketProcessorBase来处理请求NioSocketWrapper,并从工作线程池中获取一个工作线程来处理SocketProcessorBase,对应上述第13步

 
   
   
 
  1. public boolean processSocket(SocketWrapperBase<S> socketWrapper,

  2.        SocketEvent event, boolean dispatch) {

  3.    try {

  4.        if (socketWrapper == null) {

  5.            return false;

  6.        }

  7.        SocketProcessorBase<S> sc = processorCache.pop();

  8.        if (sc == null) {

  9.            sc = createSocketProcessor(socketWrapper, event);

  10.        } else {

  11.            sc.reset(socketWrapper, event);

  12.        }

  13.        Executor executor = getExecutor();

  14.        if (dispatch && executor != null) {

  15.            executor.execute(sc);

  16.        } else {

  17.            sc.run();

  18.        }

  19.    } catch (RejectedExecutionException ree) {

  20.        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);

  21.        return false;

  22.    } catch (Throwable t) {

  23.        ExceptionUtils.handleThrowable(t);

  24.        // This means we got an OOM or similar creating a thread, or that

  25.        // the pool and its queue are full

  26.        getLog().error(sm.getString("endpoint.process.fail"), t);

  27.        return false;

  28.    }

  29.    return true;

  30. }

线程分析

项目启动时,会有一个主线程main来启动tomcat容器,并且初始化一些操作(开启10个工作线程、创建并运行2个Poller线程、开启监听TCP/IP连接请求),等到执行到Acceptor中的socket = serverSock.accept()(监听TCP/IP连接请求)时,main主线程就结束了

下图中的http-nio-8080-exec-1至http-nio-8080-exec-10即是startInternal开启的10个工作线程,当前并没有任务交给工作线程,所以是WAIT状态;http-nio-8080-ClientPoller-0和http-nio-8080-ClientPoller-1是Poller的2个创建完成后就启动的守护线程,状态是RUNNING;http-nio-8080-Aceeptor-0线程是等到有网络TCP/IP连接进来后,才创建的,创建完成后立即启动,状态为RUNNING;

Tomcat监听并接受Socket连接流程

当Acceptor监听到有请求进来后,首先会启动http-nio-8080-Accptor-0线程,此线程会一直处于运行状态,监听网络连接请求;然后会从10个工作线程中获取一个线程(http-nio-8080-exec-1)来执行刚刚进来的请求

直到请求结束后,会将http-nio-8080-exec-1放回线程池,此时http-nio-8080-exec-1的状态是WAIT


以上是关于Tomcat监听并接受Socket连接流程的主要内容,如果未能解决你的问题,请参考以下文章

Tomcat请求处理流程与源码浅析

半同步/半异步进程池实现流程

面向连接的Socket Server的简单实现(简明易懂)

socket相关操作(下)

python编程系列---tcp服务端的简单实现

TCP套接字端口复用SO_REUSEADDR