netty源码之接收连接
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之接收连接相关的知识,希望对你有一定的参考价值。
目录
3、ServerBootstrapAcceptor注册到worker线程
4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector
接收链接
NIO的读事件
while(!stop){//循环遍历selector,休眠时间为1S,当又处于就绪状态的CHannel时,selector将返回该channel的集合。通过对Channel集合的迭代,可进行网络异步读写操作
try {
selector.select(1000);
Set<SelectionKey> selectKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();
SelectionKey key = null;
while (it.hasNext()){
key = it.next();
it.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
netty的接收连接
前话
轮询连接和读事件都是在NioEventLoop对象里。这里有一个run方法。
1、bossGroup 轮询链接事件
//NioEventLoop ,不管是Boss 还是 Worker都是在这里监听
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//这里是在轮询selector 等待事件
//这里是在轮询selector 等待事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long iostartTime = System.nanoTime();
try {
//处理刚才监听到的事件
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// 处理selectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
//这个是优化的方法 , 据说性能提高2%
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
//channel绑定的线程不是自己,那么不做处理,可以看到,这是线程安全的。
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//如果是链接或者读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
2、bossGroup 创建socketChannel
// 接收一个nio的 socketChannel , 并将其封装成NioSocketChannel , 并设置为OP_READ
//注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read
//AbstractNioMessageChannel.NioMessageUnsafe.read
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//在这里是接收了一个socketChannel 并加入到buf
// SocketChannel ch = SocketUtils.accept(javaChannel());
// buf.add(new NioSocketChannel(this, ch));
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在这里触发 注册的
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
3、ServerBootstrapAcceptor注册到worker线程
ServerBootstrapAcceptor 这是一个关键的类 , 它将Boss监听到链接事件 ,注册到worker线程池
//ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//这个channel 是 第二步 封装的 NioSocketChannel对象
final Channel child = (Channel) msg;
//这是引导程序配置的handlers
child.pipeline().addLast(childHandler);
//设置参数
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//选择一个NioEventLoop 注册NioSocketChannel
//next().register(channel)
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector
这块和启动流程的注册一样 注册一个 0事件(selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);)
5、workerGroup 注册读事件
DefaultChannelPipeline.HeadContext.read() , 在这里注册时间
以上是关于netty源码之接收连接的主要内容,如果未能解决你的问题,请参考以下文章