从NIO到Netty开发
Posted JAVA高级架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从NIO到Netty开发相关的知识,希望对你有一定的参考价值。
文章转自:https://blog.csdn.net/qq285016127/article/details/80393951
1. 从传统BIO到NIO的升级
Client/Server模型是网络编程的基本模型,服务端提供位置信息,客户端通过连接操作向服务端发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
传统的Socket编程是服务端一直处于accpet阻塞等待的状态,并且只有客户端发送了请求,服务才会从阻塞状态变成处理任务的状态,当任务处理完了,服务端接着进入阻塞状态,再此看来,服务端和客户端都是同步的情况。
Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。
2. NIO新特性
Java NIO: Channels and Buffers(通道和缓冲区)标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Java NIO: Non-blocking IO(非阻塞IO) Java NIO可以让你非阻塞的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
Java NIO: Selectors(选择器) Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
3. NIO服务端实现
根据上图,发现服务端的事件有2个,一是接受连接事件,二是读取数据:
public class Nioserver {
private ByteBuffer readBuffer; private Selector selector; private ServerSocket serverSocket; public static void main(String[] args) {
NIOServer server = new NIOServer();
server.init();
System.out.println("server started:8383");
server.listener();
} public void init() { //1. 创建临时缓冲区
readBuffer = ByteBuffer.allocate(1024); //2. 创建服务端Socket非阻塞通道
ServerSocketChannel serverSocketChannel; try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //3. 指定内部Socket绑定的服务端地址 并支持重用端口,因为有可能多个客户端同时访问同一端口
serverSocket=serverSocketChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(8383)); //4. 创建轮询器 并绑定到管道上,开始监听客户端请求
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
} private void listener() { while (true) { try { //5. 开始监听事件,不断取出事件的key,假如存在事件,则直接处理。
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
handleKey(key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} private void handleKey(SelectionKey key) throws IOException {
SocketChannel channel = null; try { //6. 如果有客户端要连接 这里则处理是否接受连接事件
if (key.isAcceptable()) {
ServerSocketChannel severChannel = (ServerSocketChannel) key.channel();
channel = severChannel.accept();
channel.configureBlocking(false); // 告诉轮询器 接下来关心的是读取客户端数据这件事
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { //7. 如果客户端发送数据,则这里读取数据。
channel = (SocketChannel) key.channel(); // 清空缓冲区
readBuffer.clear(); // 当客户端关闭channel后,会不断收到read事件,此刻read方法返回-1 所以对应的服务器端也需要关闭channel
int readCount = channel.read(readBuffer); if (readCount > 0) {
readBuffer.flip();
String question = CharsetHelper.decode(readBuffer).toString();
System.out.println("server get the question:" + question);
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
} else {
channel.close();
}
}
} catch (Exception e) {
e.printStackTrace();
}finally { //8. 断开连接通道
if (channel!=null) {
channel.close();
}
}
} public static String getAnswer(String question) {
String answer = null; switch (question) { case "who":
answer = "我是小娜\n"; break; case "what":
answer = "我是来帮你解闷的\n"; break; case "where":
answer = "我来自外太空\n"; break; case "hi":
answer = "hello\n"; break; case "bye":
answer = "88\n"; break; default:
answer = "请输入 who, 或者what, 或者where";
} return answer;
}
}
4. NIO客户端实现:
客户端的实现有3个步骤:1.请求连接。2.当连接成功,写数据。3.读取服务端结果。
public class NIOClient implements Runnable {
private BlockingQueue<String> words; private Random random; public static void main(String[] args) { // 多个线程发起Socket客户端连接请求
for (int i = 0; i < 5; i++) {
NIOClient c = new NIOClient();
c.init(); new Thread(c).start();
}
} //1. 初始化要发送的数据
private void init() {
words = new ArrayBlockingQueue<String>(5);
random = new Random(); try {
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
} catch (Exception e) { // TODO: handle exception
}
} //2. 启动子线程代码
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null; try { //3. 创建连接服务端的通道 并设置为阻塞方法,这里需要指定服务端的ip和端口号
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress("localhost", 8383));
selector = Selector.open(); //4. 请求关心连接事件
channel.register(selector, SelectionKey.OP_CONNECT); boolean isOver = false; while (!isOver) {
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove(); if (key.isConnectable()) { //5. 当通道连接准备完毕,发送请求并指定接收允许获取服务端返回信息
if (channel.isConnectionPending()) { if (channel.finishConnect()) {
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
} else {
key.cancel();
}
}
} else if (key.isReadable()) {//6. 开始读取服务端返回数据
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
channel.read(byteBuffer);
byteBuffer.flip();
String answer = CharsetHelper.decode(byteBuffer).toString();
System.out.println("client get the answer:" + answer);
String word = getWord(); if (word != null) {
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
} else {
isOver = true;
}
sleep();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally { //7. 关闭通道
if (channel != null) { try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} public String getWord() { return words.poll();
} private void sleep() { try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.Netty开发简介
上面提到,NIO可以实现同步非阻塞的数据交互,但是对于NIO来说,一个普通的请求数据需要太多的开发步骤,不利于推广,这里主要介绍NIO的实现框架Netty.
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
6. Netty服务端实现:
public class EchoServer {
private final int port; public EchoServer(int port) { this.port = port;
} public void start() throws Exception { //1.创建线程组
EventLoopGroup group=new NioEventLoopGroup(); try { //2.创建服务端启动对象 装配线程组&交互通道&服务器端口&网络请求处理器链
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress("localhost", port)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new EchoOutHandler1());
ch.pipeline().addLast(new EchoOutHandler2());
ch.pipeline().addLast(new EchoInHandler1());
ch.pipeline().addLast(new EchoInHandler2());
}
}); // 3.开始监听客户端请求
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("开始监听,端口号为:"+channelFuture.channel().localAddress()); // 4.等待所有请求执行完毕后,关闭通道;如请求还没执行完,这里为阻塞状态。
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally { //5.停止所有线程组内部代码的执行
group.shutdownGracefully().sync();
}
} public static void main(String[] args) throws Exception { new EchoServer(2000).start();
}
}
7.Netty客户端实现:
public class EchoClient {
public static void main(String[] args) throws InterruptedException { new EchoClient("localhost", 2000).start();
} private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port;
} private void start() throws InterruptedException { //1.创建线程组
EventLoopGroup group = new NioEventLoopGroup(); try { //2. 创建客户端启动对象,同样需要装配线程组,通道,绑定远程地址,请求处理器链。
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port)
.handler(new ChannelInitializer<Channel>() { @Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
}); //3.开始请求连接
ChannelFuture future = bootstrap.connect().sync(); //4.当请求操作结后,关闭通道。
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally { if (group != null) {
group.shutdownGracefully().sync();
}
}
}
}
8.Netty处理器链
对于向服务端发送一个请求,并得到一个响应来说。如果使用Netty来说,需要实现两种不同的处理器,一个是读的一个是写的。他们共同组成一个链式调用,如下图:
对于服务端来说,上面我们创建了4个处理器,他们组成一条链,分别是:EchoInHandler1 -> EchoInHandler2 -> EchoOutHandler2 -> EchoOutHandler1.
public class EchoInHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("EchoInHandler1 channelRead..."); //将消息传递到新的链。。。
ctx.fireChannelRead(msg);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}public class EchoInHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("EchoInHandler2 channelRead..."); // Object msg 为Netty的一种缓存对象
ByteBuf buffer = (ByteBuf) msg; byte[] req = new byte[buffer.readableBytes()];
buffer.readBytes(req);
String reqBody = new String(req, "UTF-8");
System.out.println("获取到的客户端请求:" + reqBody); // 往客户端写数据
String date = new Date().toString();
ByteBuf returnBuf = Unpooled.copiedBuffer(date.getBytes());
ctx.write(returnBuf);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("EchoOutHandler2 write...");
ctx.write(msg);
}
}public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("EchoOutHandler1 write...");
System.out.println("write msg:" + msg);
ctx.write(msg);
ctx.flush();// 最后将数据刷新到客户端
}
}
客户端的处理器主要是当连接成功后,请求获取当前时间,并读取返回结果:
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ //客户端连接服务器的时候调用
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接服务器。。。。"); byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuf copiedBuffer = Unpooled.buffer(req.length);
copiedBuffer.writeBytes(req);
ctx.writeAndFlush(copiedBuffer);
} //读取服务端数据
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf bytbuf) throws Exception {
System.out.println("client get the server's data"); byte[] resp=new byte[bytbuf.readableBytes()];
bytbuf.readBytes(resp);
String respContent = new String(resp,"UTF-8");
System.out.println("返回的数据:"+respContent);
} //强制关闭服务器的连接也会造成异常:远程主机强迫关闭了一个现有的连接。
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getLocalizedMessage());
ctx.close();
}
}
温馨提示
下方广告
下方广告
下方广告
点击支持一下!
以上是关于从NIO到Netty开发的主要内容,如果未能解决你的问题,请参考以下文章