Tomcat监听并接受Socket连接流程
Posted 算法技术猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tomcat监听并接受Socket连接流程相关的知识,希望对你有一定的参考价值。
tomcat建立Socket监听TCP连接,获取连接后委托SocketProcessor处理器处理后续流程
流程:
1、启动tomcat 2、启动tomcat完成后调用NioEndpoint的startInternal()方法初始化操作 3、createExecutor():初始化创建10个工作线程的线程池,此线程池最大只能创建200个线程,最小空闲工作线程为10 4、设置tomcat并发最大连接数为10000 5、创建并运行2个Poller守护线程,其作用是将Acceptor接收的连接请求委托给SocketProcessor处理器 6、startAcceptorThreads():创建一个Acceptor线程对象,用于监听网络TCP/IP连接,并将Acceptor线程对象作为一个守护线程启动,启动线程后就一直做死循环监听TCP连接 7、如果当前连接数大于10000,则等待其他连接释放,如果当前连接数小于10000,则继续后续操作 8、socket = serverSock.accept()等待TCP网络请求进来 9、socket = serverSock.accept();有返回值时,说明有一个TCP连接进来,然后执行setSocketOptions(),将当前连接注册到Poller中 10、getPoller0().register(channel);由于Poller中的两个线程一直在运行,所以当channel注册到Poller中时,Poller中的selector.select()就会得到当前连接 11、Poller.run()方法中,当selector.select()有返回值时,循环遍历每个值SelectionKey,然后调用processKey(SelectionKey,attachment) 12、processKey()方法中调用processSocket()方法,创建一个SocketProcessorBase线程对象处理器来处理连接请求 13、从工作线程池中获取一个工作线程来运行SocketProcessorBase线程对象,其具体实现在NioEndpoint的内部类SocketProcessor.doRun()
NioEndpoint源码分析:
tomcat容器启动成功后,会开启一个NIO端点,创建含有10个工作线程(http-nio-8080-exec-xxx)的线程池,最大并发连接数为10000,然后创建大小为2的Poller线程对象数组,并启动数组中的2个守护线程,然后启动一个Acceptor线程监听网络TCP/IP的连接请求,监听到请求后,注册到Poller线程对象中,Poller线程对象委托SocketProcessor处理请求 startInternal()源码对应上述流程第1-5步:
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
//启动Acceptor线程时,会判断running和paused 状态
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
//创建10个工作线程
if ( getExecutor() == null ) {
createExecutor();
}
//初始化最大并发连接数为10000
initializeConnectionLatch();
// Start poller threads
//创建有2个守护线程的poller数组,并且启动这2个守护线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//启动Acceptor线程,监听网络TCP/IP的连接请求
startAcceptorThreads();
}
}
创建Acceptor线程:startAcceptorThreads() 对应上述流程第6步
protected final void startAcceptorThreads() {
//创建一个线程数为1的Acceptor的数组
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
//创建Acceptor监听网络TCP/IP请求,具体实现在NioEndpoint中
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
Acceptor线程启动逻辑:对应上述流程第7-9步
/**
* 是一个监听网络TCP/IP连接的后台线程,接收到连接后会交给合适的处理器(SocketProcessor/ConnectionHandler)处理,Acceptor并非直接将连接请求交给SocketProcessor,而是经过了Poller
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
// 直到接受到关闭连接的命令,否则一直循环下去
while (running) {
// Loop if endpoint is paused
//如果处于暂定状态,则一直等到paused为false
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
// 如果还没有到达最大连接数,则当前连接数量做加一操作,
// 如果到达最大连接数则让当前接收线程处于等待状态,直到有连接被释放
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
//从服务器套接字serverSock接收下一个进入的连接请求,如果没有连接请求,则一直阻塞,直到有连接到来才继续后续的处理
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// 将当前连接请求注册到Poller中
if (!setSocketOptions(socket)) {
//关闭socket,并将连接数-1
closeSocket(socket);
}
} else {
//关闭socket,并将连接数-1
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
private void closeSocket(SocketChannel socket) {
countDownConnection();
try {
socket.socket().close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.close"), ioe);
}
}
}
}
将TCP连接请求注册到Poller线程对象中,对应上述流程第10步
/**
* 处理指定的连接套接字
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//从Poller中获取一个线程来处理新接收到的请求连接,委托其完成相应处理,委托完成后,当前线程继续自己被设定的监听接收委托任务
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
Poller中定义有一个Selector(选择器),它是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件,一个单独的线程可以管理多个channel,从而管理多个网络连接。Poller线程开启后一直处于运行状态,待Poller中的selector.select()有返回值时,继续处理后续流程,否则一直阻塞,对应上述流程第11-12步
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
//获取selector中的连接数量,只有keyCount大于0时,才处理后续流程,否则一直阻塞
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NiosocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
创建SocketProcessorBase来处理请求NioSocketWrapper,并从工作线程池中获取一个工作线程来处理SocketProcessorBase,对应上述第13步
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
线程分析
项目启动时,会有一个主线程main来启动tomcat容器,并且初始化一些操作(开启10个工作线程、创建并运行2个Poller线程、开启监听TCP/IP连接请求),等到执行到Acceptor中的socket = serverSock.accept()(监听TCP/IP连接请求)时,main主线程就结束了
下图中的http-nio-8080-exec-1至http-nio-8080-exec-10即是startInternal开启的10个工作线程,当前并没有任务交给工作线程,所以是WAIT状态;http-nio-8080-ClientPoller-0和http-nio-8080-ClientPoller-1是Poller的2个创建完成后就启动的守护线程,状态是RUNNING;http-nio-8080-Aceeptor-0线程是等到有网络TCP/IP连接进来后,才创建的,创建完成后立即启动,状态为RUNNING;
当Acceptor监听到有请求进来后,首先会启动http-nio-8080-Accptor-0线程,此线程会一直处于运行状态,监听网络连接请求;然后会从10个工作线程中获取一个线程(http-nio-8080-exec-1)来执行刚刚进来的请求
直到请求结束后,会将http-nio-8080-exec-1放回线程池,此时http-nio-8080-exec-1的状态是WAIT
以上是关于Tomcat监听并接受Socket连接流程的主要内容,如果未能解决你的问题,请参考以下文章