nio框架netty在项目中的应用
Posted 六楼外的风景
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nio框架netty在项目中的应用相关的知识,希望对你有一定的参考价值。
背景:
省分短信发送每天都差不多要1000W条上下,遇到特殊节假日和政府通告时量会更大!boss系统中存放的是短信发送内容,而真正完成发送短信指令动作是的华为方做的短厅,这么大的通信量选择了netty来完成数据传输并自定义了一套基于netty的SGIP协议进行通信;
省分boss系统—>短信营业厅();
基本知识
2.1 TCP/IP网络协议
网上很多有关这个协议的解释,自行google,下面是简单的理解记忆:
tcp/ip的3次握手, 简单来说就是第一次我连接你给你一个标识SYN,你给我返回SYN并给一个新的ACK标记我,然后我再把ACK给你,
这样证明我们之前传东西是可靠,的然后就正式传数据了
图片来自网上
tcp/ip的4次挥手断开,相当于,你给我一个ACK我给你一个FIN,然后再次彼此交换确认,OK就可以结束通信了
图片来自网上
java的socket就是对tcp/ip的一种实现
基础代码,:
一个简单的socket实现tcp/ip的样例,后面的BIO/NIO/AIO都是基本上于这个例子进行变化
client端:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class Client
final static String ADDRESS = "127.0.0.1";
final static int PORT = 7788;
public static void main(String[] args)
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//向服务器端发送数据
out.println("接收到客户端的请求数据...");
out.println("接收到客户端的请求数据1111...");
String response = in.readLine();
System.out.println("Client: " + response);
...
Server端:
public class Server
final static int PROT = 7788;
public static void main(String[] args)
ServerSocket server = null;
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
//进行阻塞
Socket socket = server.accept();
//新建一个线程执行客户端的任务
new Thread(new ServerHandler(socket)).start();
ServerHandler.java 如下:
public class ServerHandler implements Runnable
private Socket socket ;
public ServerHandler(Socket socket)
this.socket = socket;
@Override
public void run()
BufferedReader in = null;
PrintWriter out = null;
try
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while(true)
body = in.readLine();
if(body == null) break;
System.out.println("Server :" + body);
out.println("服务器端回送响的应数据.");
上面这个代码很简单转换成图型说明就是web浏览器发一个请求过来,web服务器就要new 一个线程来处理这个请求,这是传统的请求处理模型,这也就引来一个很大的问题,当请求越多,服务器端的启用线程也要越多,我们都知道linux(window)的文件句柄数有是限的,默认是1024,当然可以修改,上限好像是65536 ,(一个柄也相当于一个socket也相当于一个thread,linux查看文件句柄Unlimit -a) 其实在实际当中只要并发到1000上下响应请求就会很慢了,所以这种模型是有问题的,这种也就是同步阻塞IO编程(JAVA BIO)
网上查的定义:
同步阻塞IO(JAVA BIO):
同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销.
- BIO—>NIO—->AIO的发展历程
上面的BIO是有问题的,也就是出现在jdk1.4那个古时代的产物,现在当然这要改进下,上面的问题无非就是服务器端的线程无限制的增长才会导致服务器崩掉,那我们就对征下药,加个线程池限制线程的生成,又可以复用空闲的线程,是的,在jdk1.5也是这样做的,下面是服务器端改进后的代码:
public class Server
final static int PORT = 7788;
public static void main(String[] args)
ServerSocket server = null;
BufferedReader in = null;
PrintWriter out = null;
server = new ServerSocket(PORT);
System.out.println("server start");
Socket socket = null;
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
while(true)
socket = server.accept();
executorPool.execute(new ServerHandler(socket));
HandlerExecutorPool.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HandlerExecutorPool
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize)
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
public void execute(Runnable task)
this.executor.execute(task);
Jdk1.5创造了一个假的nio 用一个HanderExecutorPool来限定了线程数量,但只是解决了服务器端不会因为并发太多而死掉,但解决不了并发大而响应越来越慢的,到时你也会怀疑你是不是真的用了一个假的nio!!!!!!!
为了解决这个问题,就要用三板斧来解决!
别急,要解决一个诸葛亮,你必先要造三个臭皮匠,先引入3个NIO相关概念先!
1> Buffer 缓冲区
难用的buffer是一个抽象的对象,下面还有ByteBuffer,IntBuffer,LongBuffer等子类,相比老的IO将数据直接读/写到Stream对象,NIO是将所有数据都用到缓冲区处理,它本质上是一个数组,提供了位置,容量,上限等操作方法,还是直接看代码代码来得直接
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7788);//创建连接的地址
SocketChannel sc = null;//声明连接通道
ByteBuffer buf = ByteBuffer.allocate(1024);//建立缓冲区
sc = SocketChannel.open();//打开通道
sc.connect(address);//进行连接
while(true)
//定义一个字节数组,然后使用系统录入功能:
byte[] bytes = new byte[1024];
System.in.read(bytes);
buf.put(bytes);//把数据放到缓冲区中
buf.flip();//对缓冲区进行复位
sc.write(buf);//写出数据
buf.clear();//清空缓冲区数据
2>Channel 通道
如自来水管一样,支持网络数据从Channel中读写,通道写流最大不同是通道是双向的,而流是一个方向上移动(InputStream/OutputStream),通道可用于读/写或读写同时进行,它还可以和下面要讲的selector结合起来,有多种状态位,方便selector去识别. 通道分两类,一:网络读写(selectableChannel),另一类是文件操作(FileChannel),我们常用的是上面例子中的网络读写!
3>Selector 多路复用选择器
它是神一样存在的东西,多路复用选择器提供选择已经就绪的任务的能力,也就是selector会不断轮询注册在其上的通道(Channel),如果某个通道发生了读写操作,这个通道处于就绪状态,会被selector轮询出来,然后通过selectionKey可以取得就绪的Channel集合,从而进行后续的IO操作.
一个多路复用器(Selector)可以负责成千上万个Channel,没有上限,这也是JDK使用epoll代替了传统的selector实现,获得连接句柄没有限制.这也意味着我们只要一个线程负责selector的轮询,就可以接入成千上万个客户端,这是JDK,NIO库的巨大进步.
来张精心整好的图
这个学习进到深水区了,注意罗,下面是服务器端的代码,上面例子代码是client端的,看里面的注解,如果还不明白,多看几次,代码是可运行的,记得要jdk1.7以上版本,多运行,自己意会下,我也只帮到这了!
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server implements Runnable
//1 多路复用器(管理所有的通道)
private Selector seletor;
//2 建立缓冲区
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port)
try
//1 打开路复用器
this.seletor = Selector.open();
//2 打开服务器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 设置服务器通道为非阻塞模式
ssc.configureBlocking(false);
//4 绑定地址
ssc.bind(new InetSocketAddress(port));
//5 把服务器通道注册到多路复用器上,并且监听阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
catch (IOException e)
e.printStackTrace();
@Override
public void run()
while(true)
try
//1 必须要让多路复用器开始监听
this.seletor.select();
//2 返回多路复用器已经选择的结果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 进行遍历
while(keys.hasNext())
//4 获取一个选择的元素
SelectionKey key = keys.next();
//5 直接从容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid())
//7 如果为阻塞状态
if(key.isAcceptable())
this.accept(key);
//8 如果为可读状态
if(key.isReadable())
this.read(key);
//9 写数据
if(key.isWritable())
//this.write(key); //ssc
catch (IOException e)
e.printStackTrace();
private void write(SelectionKey key)
//ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//ssc.register(this.seletor, SelectionKey.OP_WRITE);
private void read(SelectionKey key)
try
//1 清空缓冲区旧的数据
this.readBuf.clear();
//2 获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//3 读取数据
int count = sc.read(this.readBuf);
//4 如果没有数据
if(count == -1)
key.channel().close();
key.cancel();
return;
//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收缓冲区数据
this.readBuf.get(bytes);
//8 打印结果
String body = new String(bytes).trim();
System.out.println("Server : " + body);
// 9..可以写回给客户端数据
catch (IOException e)
e.printStackTrace();
private void accept(SelectionKey key)
try
//1 获取服务通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 执行阻塞方法
SocketChannel sc = ssc.accept();
//3 设置阻塞模式
sc.configureBlocking(false);
//4 注册到多路复用器上,并设置读取标识
sc.register(this.seletor, SelectionKey.OP_READ);
catch (IOException e)
e.printStackTrace();
public static void main(String[] args)
new Thread(new Server(7788)).start();;
如果你理解了Java NIO ,下面讲的netty也是水到渠成的事,只想说,深水区已过了!
差点忘记还要补下AIO的,这个比NIO先进的技术,最终实现了
netty
这是神一样存在的java nio框架, 这个偏底层的东西,可能你接触较少却又无处不在,比如:
在业界有一篇无法超越的netty入门文章,我也没这个能力超越,只能双手奉上,你们好好研读,必然学有所成!
http://ifeve.com/netty5-user-guide/
还有杭州华为的李林锋写的 Netty权威指南 ,我是买了一本,不知如何评论好,中等吧!
刚好,我这边要修改的项目也是华为的………………
未完,先发…………………………………..
以上是关于nio框架netty在项目中的应用的主要内容,如果未能解决你的问题,请参考以下文章
netty系列之:不用怀疑,netty中的ByteBuf就是比JAVA中的好用
高性能异步事件驱动的NIO框架,结合英雄传说项目深入剖析Netty
轻松上手NIO的框架技术Netty,Mina,不看你肯定后悔!!!