IT新手之路Netty学习之旅

Posted 花粉帮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IT新手之路Netty学习之旅相关的知识,希望对你有一定的参考价值。


【IT新手之路】Netty学习之旅(一)



你知道网络架构分层吗?TCP/UDP各有什么优缺点? 你知道网络通信过程吗?一次https请求要经历哪些流程? 你知道网络IO模型吗?c10k问题是如何解决的? 你知道Java如何实现网络编程吗?Netty在Java Nio上做了哪些优化呢?

个人认为,学习知识,应该先构建知识全貌,然后再针对核心知识点进行学习,最终将每个知识点串联起来,融会贯通,即最终要构建我们的知识体系.所以,这篇文章的目的,是让你了解netty在网络编程中的知识全貌,先在脑海里有了这些知识点的概念,才能避免"不识庐山真面目,只缘身在此山中"的问题。



一、Netty是什么

官网: https://netty.io/index.html 

github: https://github.com/netty/netty

Netty is an asynchronous event-driven network application frameworkfor rapid development of maintainable high performance protocol servers & clients.

Netty是一个异步、事件驱动的网络应用框架,用于快速开发可维护、高性能协议服务端和客户端。

下图是netty所包含的组件:

【IT新手之路】Netty学习之旅(一)

可以看到netty包含三个模块:

  • Core 核心模块: 可扩展的事件模型;通用的通信api;有零拷贝能力的Byte Buffer

  • Transport Services 传输服务: socket、UDP、http tunnel等

  • Protocol Support 协议支持: http、websocket、rtsp等协议的支持

我们来看看netty涉及到的知识层级划分:

【IT新手之路】Netty学习之旅(一)

  • netty可以应用于各rpc框架,如dubbo等

  • netty是基于Java NIO实现的,并在此基础上做了优化

  • netty是基于reactor模式,Linux IO多路复用模型

  • netty可以实现传输层TCP/UDP协议的通信

  • netty的多路复用器基于poll、epoll系统调用实现,通过mmap、sendfile实现零拷贝

Netty有哪些优点
  • 性能优秀,高吞吐、低延迟

  • 可扩展性好,基于事件驱动模型、可自定义线程模型

  • 支持非阻塞的socket

  • 文档齐全、社区活跃

相比于市面其他网络编程框架,为什么要选择使用Netty

•Apache Mina netty与mina是同一作者的不同产品,netty是作者用来针对mina提高扩展性与解决一些已知问题的产品,可以理解为netty是mina的升级版

  • Sun Grizzly 社区没有Netty活跃,更新没有netty频繁

  • Apple Swift NIO 非java语言

  • tomcat/jetty tomcat/jetty是Servlet容器,主要应用基于Servlet的Http协议的处理;而netty不仅支持Http,还支持FTP、SSH等应用层协议以及传输层的UDP协议,并且还支持自定义协议;

https://java.libhunt.com/compare-netty-vs-mina https://java.libhunt.com/compare-grizzly-vs-netty

为什么不直接使用Java Nio,而使用Netty呢?
  • Netty支持常用的应用层协议如Http,而Java Nio需要开发人员去手动进行协议处理

  • Netty处理了TCP粘包半包问题,而Java Nio未处理

  • Netty支持流浪整形,而Java Nio不支持

  • Netty完善的处理了断连、空闲等异常处理

  • Netty修复了一些Java Nio的bug


二、如何去学习Netty

如何学习一个框架,重要的不是学习如何去使用它,也不是仅仅去读一读源码,而是学习它的过程中,理解它的原理及设计思路,这样当你学习其他相关框架,或自己设计系统的时候,就能事半功倍.
在上面的[知识层级划分图]中,黄色部分是偏底层及理论的知识,让我们通过两个Java程序,来了解下它们;

Java NIO Demo

什么是NIO,个人理解,操作系统层面,是Non-blocking IO,Java层面叫它new IO或许比Non-blocking IO合适,因为它并不是完全非阻塞的,比如Selector.select()就是阻塞的方法;

让我们来看一个比较原始的Java NIO demo
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Date;import java.util.Iterator;
public class Nioserver implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public static void main(String[] args) { new Thread(new NioServer(8765), "NioServer-001").start(); }
public NioServer(int port) { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } }
@Override public void run() { while(true){ try { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); SelectionKey key = null; while(iterator.hasNext()) { key = iterator.next(); iterator.remove(); try { handleEvent(key); } catch (IOException e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } }
private void handleEvent(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { handleAccept(key); } if (key.isReadable()) { handleRead(key); } } }
private void handleAccept(SelectionKey key) throws IOException { System.out.println("connect accept,time" + System.currentTimeMillis()); ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(this.selector, SelectionKey.OP_READ); }
private void handleRead(SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes).trim(); System.out.println("server recive body:" + body);
String response = "currentTime=" + new Date().toString(); doWrite(sc, response); } else if (readBytes < 0) { key.cancel(); sc.close(); } }
private void doWrite(SocketChannel socketChannel, Strin response) throws IOException { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); socketChannel.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("response 2 client succeed:" + response); } }
这是一段Java Nio Server端的代码,主要功能就是client端发消息过来后,回复一条消息;我们主要看几个核心的Java对象及方法对应的内核系统调用,了解下在操作系统内核层面的实现;
   服务器版本: CentOS Linux release 7.6.1810
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
对应内核调用的bind及listen函数,表示将 fd=9 的socket绑定到本机(0.0.0.0)的8765端口号上,并开启监听;
bind(9, {sa_family=AF_INET, sin_port=htons(8765), sin_addr=inet_addr("0.0.0.0")}, 16) = 0listen(9, 1024) 
selector = Selector.open();
对应内核调用的epoll_create,表示创建一个fd=8的多路复用器;
epoll_create(256) = 8
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
对应内核使用的epoll_ctl,表示将fd=9的socket的读事件注册到fd=8的多路复用器上,就能监听到这个socket的读事件了;
epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN, {u32=9, u64=4538492564453457929}}) = 0
selector.select();
对应内核使用的epoll_wait,表示fd=8的多路复用器阻塞等待事件发生;
epoll_wait(8,
SocketChannel sc = ssc.accept();
当Client发起对Server端的连接后,Server端内核调用accept,接收此连接,并生成一个新的fd=11的socket;
accept(9, {sa_family=AF_INET, sin_port=htons(35744), sin_addr=inet_addr("127.0.0.1")}, [16]) = 11
这里有个小细节,接收请求与读写不是在同一个socket,这也是为什么读写事件并不是把serverSocketChannel注册到Selector上;
从上面内容,我们了解Java Nio中的一些核心组件,也让我们了解到了Java NIO通信过程中,会涉及到TPC/IP协议、多路复用器、以及一些内核系统调用;
关于上面代码的系统调用过程,可以参考另一篇文章:

Multiple Reactor Threads Demo

Multiple Reactor Threads是Reactor Pattern中的主从Reactor多线程模式;关于reactor模式,可以阅读参考文档4及参考文档5)

import java.io.IOException;
public class Server {
public static void main(String[] args) throws IOException { MainReactor reactor = new MainReactor(8765); new Thread(reactor, "mainReactor").start(); }}
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.Iterator;
public class MainReactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
public MainReactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new Acceptor()); }
@Override public void run() { while (!Thread.interrupted()) { try { if (this.selector.select(1000L) > 0) { Iterator<SelectionKey> selectionKeys = this.selector.selectedKeys().iterator(); while (selectionKeys.hasNext()) { SelectionKey key = selectionKeys.next(); if (!key.isValid()) { continue; } selectionKeys.remove(); dispatch(key); } } } catch (IOException e) { e.printStackTrace(); } } }
void dispatch(SelectionKey selectionKey) throws IOException { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.process(selectionKey); }}import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;
public class SubReactor implements Runnable {
final Selector selector;
public SubReactor() throws IOException { this.selector = Selector.open();
} public void register(SocketChannel socketChannel) throws IOException { SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); selectionKey.attach(new Handler()); }
@Override public void run() { while (!Thread.interrupted()) { try { if (this.selector.select(1000L) > 0) { Iterator<SelectionKey> selectionKeys = this.selector.selectedKeys().iterator(); while (selectionKeys.hasNext()) { SelectionKey key = selectionKeys.next(); if (!key.isValid()) { continue; } selectionKeys.remove(); dispatch(key); } } } catch (IOException e) { e.printStackTrace(); } } }
void dispatch(SelectionKey selectionKey) throws IOException { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.process(selectionKey); }}import java.io.IOException;import java.nio.channels.SelectionKey;
public interface EventHandler { void process(SelectionKey selectionKey) throws IOException;}import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;
public class Acceptor implements EventHandler {
SubReactor[] subReactors;
int index;
public Acceptor() throws IOException { SubReactor subReactor01 = new SubReactor(); SubReactor subReactor02 = new SubReactor(); SubReactor subReactor03 = new SubReactor(); this.subReactors = new SubReactor[]{subReactor01, subReactor02, subReactor03};
new Thread(subReactor01, "subReactor01").start(); new Thread(subReactor02, "subReactor02").start(); new Thread(subReactor03, "subReactor03").start(); }
@Override public void process(SelectionKey selectionKey) throws IOException { if (!selectionKey.isAcceptable()) { return; } ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); SubReactor subReactor = subReactors[index]; index++; if (index >= subReactors.length) { index = 0; } subReactor.register(socketChannel); }}import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class Handler implements EventHandler {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override public void process(SelectionKey selectionKey) throws IOException { executorService.execute(() -> { try { process2(selectionKey); } catch (IOException e) { e.printStackTrace(); } }); }
private void process2(SelectionKey selectionKey) throws IOException { if (selectionKey.isReadable()) { processRead(selectionKey); } }
private void processRead(SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes).trim(); System.out.println(Thread.currentThread().getName() + " server recive body:" + body);
String response = "currentTime=" + new Date().toString(); processWrite(sc, response); } else if (readBytes < 0) { key.cancel(); sc.close(); } }
/** * 由于是本地测试demo代码,所以针对写事件,并没有去注册OP_WRITE; */ private void processWrite(SocketChannel socketChannel, String response) throws IOException { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); socketChannel.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("response 2 client succeed:" + response); } }}
上面的代码是Reactor模式里的Multiple Reactor Threads模式,可以看到它的结构图如下: 

【IT新手之路】Netty学习之旅(一)


Netty就是采用的这种模式,不仅Netty会使用Reactor模式,redis(单reactor单线程)、nginx等应用都使用了reactor,那为什么要用这种模式,它有什么好处?我们在后面的文章进行说明;


三、小结

通过之前的内容,我们了解了Netty所涉及到的相关知识: Java NIO API、Reactor模式、Linux IO模式、TCP/IP、以及一些系统调用;当然,还有未提及到的zero-copy、内存分配、编解码等知识也很重要;

你可能会问,上面的Java程序并没有使用到Netty啊,是的,确实没使用到Netty,但是Netty是基于Java NIO开发的,弄清楚了Java NIO,学习Netty事半功倍;

或许你还心存疑惑,毕竟这篇文章只是Netty学习系列的开篇,它还不够详细,但它的目的只是为了让你大致了解Netty的知识体系,激发你的学习兴趣,让我们在后续的文章中继续我们的Netty旅程,并能有深度得回答本文开头的那些问题;


四、参考文档

1.https://netty.io/index.html

2.https://java.libhunt.com/compare-netty-vs-mina

3.https://java.libhunt.com/compare-grizzly-vs-netty

4.http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

5.http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

6.UNIX网络编程卷1:套接字联网API





以上是关于IT新手之路Netty学习之旅的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin学习之旅解决错误:kotlin.NotImplementedError: An operation is not implemented: Not yet implemented(代码片段

我的OpenGL学习进阶之旅解决着色器语法错误:The shader uses varying u_Color, but previous shader does not write to it

我的OpenGL学习进阶之旅解决着色器语法错误:The shader uses varying u_Color, but previous shader does not write to it

我的OpenGL学习进阶之旅解决着色器语法错误:The shader uses varying u_Color, but previous shader does not write to it

我的C语言学习进阶之旅解决 Visual Studio 2019 报错:错误 C4996 ‘fscanf‘: This function or variable may be unsafe.(代码片段

我的C语言学习进阶之旅解决 Visual Studio 2019 报错:错误 C4996 ‘fscanf‘: This function or variable may be unsafe.(代码片段