Netty笔记

Posted 菜丸的程序屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty笔记相关的知识,希望对你有一定的参考价值。

字节跳动就是我心中的耶路撒冷!


netty介绍:

    Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。



netty最核心的组件:
  1. EventLoop 

  2. Event

  3. Channel

  4. ChannelPipeline

  5. ChannelHandler

  6. ChannelContextHandler  

netty的组件

Bootstrap:

netty的辅助启动器,netty客户端和服务器的入口,Bootstrap是创建客户端连接的启动器,ServerBootstrap是监听服务端端口的启动器,跟tomcatBootstrap类似,程序的入口。

  Channel:

关联jdk原生socket的组件,常用的是NioserverSocketChannelNioSocketChannelNioServerSocketChannel负责监听一个tcp端口,有连接进来通过boss reactor创建一个NioSocketChannel将其绑定到worker reactor,然后worker reactor负责这个NioSocketChannel的读写等io事件。

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.A channel provides a user:* the current state of the channel (e.g. is it open? is it connected?),* the [configuration parameters](https://netty.io/4.1/api/io/netty/channel/ChannelConfig.html "interface in io.netty.channel") of the channel (e.g. receive buffer size),* the I/O operations that the channel supports (e.g. read, write, connect, and bind), and*   the [`ChannelPipeline`](https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html "interface in io.netty.channel") which handles all I/O events and requests associated with the channel.
EventLoop:
netty最核心的几大组件之一,就是我们常说的 reactor ,人为划分为 boss reactor worker reactor 通过 EventLoopGroup Bootstrap 启动时会设置 EventLoopGroup )生成,最常用的是nio的 NioEventLoop ,就如同 EventLoop 的名字, EventLoop 内部有一个无限循环,维护了一个 selector ,处理所有注册到 selector 上的 io 操作,在这里实现了一个线程维护多条连接的工作。
ChannelPipeline:

netty最核心的几大组件之一,ChannelHandler的容器,netty处理io操作的通道,与ChannelHandler组成责任链。write、read、connect等所有的io操作都会通过这个ChannelPipeline,依次通过ChannelPipeline上面的ChannelHandler处理,这就是netty事件模型的核心。ChannelPipeline内部有两个节点,headtail,分别对应着ChannelHandler链的头和尾。

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel.ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.
ChannelHandler:
netty 最核心的几大组件之一, netty 处理 io 事件真正的处理单元,开发者可以创建自己的 ChannelHandler 来处理自己的逻辑,完全控制事件的处理方式。 ChannelHandler ChannelPipeline 组成责任链,使得一组 ChannelHandler 像一条链一样执行下去。 ChannelHandler 分为 inBound outBound ,分别对应 io read write 的执行链。 ChannelHandler ChannelHandlerContext 包裹着,有 prev next 节点,可以获取前后 ChannelHandler read 时从 ChannelPipeline head 执行到 tail write 时从 tail 执行到 head ,所以 head 既是 re ad 事件的起点也是 write 事件的终点,与 io 交互最紧密。
使用ChannelHandlerContext在handler之间传递事件public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } }
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) { System.out.println("Closing .."); ctx.close(promise); } }
Unsafe:
顾名思义这个类就是不安全的意思,但并不是说这个类本身不安全,而是不要在应用程序里面直接使用 Unsafe 以及他的衍生类对象,实际上 Unsafe 操作都是在 reactor 线程中被执行。 Unsafe Channel 的内部类,并且是 protecte d修饰的,所以在类的设计上已经保证了不被用户代码调用。 Unsafe 的操作都是和 jdk 底层相关。 EventLoop 轮询到 read accept 事件时,会调用 unsafe.read() unsafe 再调用 ChannelPipeline 去处理事件;当发生 write 事件时,所有写事件都会放在 EventLoop task 中,然后从 ChannelPipeline tail 传播到 head ,通过 Unsafe 写到网络中。

Client端的代码实现

import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * Netty客户端的程序 * @author CaiWan */public class Client { /*IP地址*/ static final String HOST = System.getProperty("host", "127.0.0.1"); /*端口号*/ static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
public static void main(String[] args) throws Exception { EventLoopGroup workgroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap();//客户端 b.group(workgroup) .channel(NioSocketChannel.class)//客户端 -->NioSocketChannel .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() {//handler @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); //创建异步连接 可添加多个端口 ChannelFuture cf1 = b.connect(HOST, PORT1).sync(); ChannelFuture cf2 = b.connect(HOST, PORT2).sync();
//buf //client向server端发送数据 Buffer形式 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));

cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync();
workgroup.shutdownGracefully(); }}

Servler端代码实现:

import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * Netty实现的服务端程序 * @author CaiWan */public class Server{ /*端口号*/ static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
    public static void main(String[] args){ EventLoopGroup bossGroup = null; EventLoopGroup workerGroup = null; ServerBootstrap b = null; try{ //1:第一个线程组是用于接收Client连接的 bossGroup = new NioEventLoopGroup(); //(1) //2:第二个线程组是用于实际的业务处理操作的 workerGroup = new NioEventLoopGroup(); //3:创建一个启动NIO服务的辅助启动类ServerBootstrap 就是对我们的Server进行一系列的配置 b = new ServerBootstrap();//(2) //4:绑定两个线程组 b.group(bossGroup, workerGroup) //5:需要指定使用NioServerSocketChannel这种类型的通道 .channel(NioServerSocketChannel.class)//(3) 服务端 -->NioServerSocketChannel //6:一定要使用childHandler 去绑定具体的事件处理器 .childHandler(new ChannelInitializer<SocketChannel>() //(4) childHandler { @Override protected void initChannel(SocketChannel sc) throws Exception{ //7:将自定义的serverHandler加入到管道中去(多个) sc.pipeline().addLast(new ServerHandler());//handler中实现真正的业务逻辑// sc.pipeline().addLast(new ServerHandler());// sc.pipeline().addLast(new ServerHandler()); } }) /** * 服务器端TCP内核模块维护两个队列,我们称之为A,B吧 * 客户端向服务端connect的时候,会发送带有SYN标志的包(第一次握手) * 服务端收到客户端发来的SYN时,向客户端发送SYN ACK确认(第二次握手) * 此时TCP内核模块把客户端连接加入到A队列中,最后服务端收到客户端发来的ACK时(第三次握手) * TCP内核模块把客户端连接从A队列移到B队列,连接成功,应用程序的accept会返回 * 也就是说accept从B队列中取出完成三次握手的连接 * A队列和B队列的长度之和是backLog,当A,B队列的长度之和大于backLog时,新连接将会被TCP内核拒绝 * 所以,如果backLog过小,可能会出现accept速度跟不上,A,B队列满了,导致新的客户端无法连接, * 要注意的是,backLog对程序支持的连接数并无影响,backLog影响的只是还没有被accept取出的连接 */ //8:设置TCP连接的缓冲区 .option(ChannelOption.SO_BACKLOG, 128)//(5)// .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲大小// .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小 //9:保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true);//(6) //10:绑定指定的端口 进行监听 //此处端口号先写死 也可以绑定多个端口 ChannelFuture cf2= b.bind(PORT1).sync(); // (7)
ChannelFuture cf3= b.bind(PORT2).sync(); // (7) 绑定多个端口
//Thread.sleep(10000); cf2.channel().closeFuture().sync(); //异步等待关闭 cf3.channel().closeFuture().sync(); //异步等待关闭
}catch(Exception e){ e.printStackTrace(); }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}

实现数据传输的业务逻辑层代码的实现

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;
/** * 客户端业务处理类 * (编写主要的业务逻辑) * @author CaiWan */public class ClientHandler extends ChannelHandlerAdapter{ // ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。 // 请记住处理器的职责是释放所有传递到处理器的引用计数对象。 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ try{ //do something //接收服务端发来的数据 ByteBuf ByteBuf buf = (ByteBuf)msg; //创建一个和buf一样长度的字节空数组 byte[] data = new byte[buf.readableBytes()]; //将buf中的数据读取到data数组中 buf.readBytes(data); //将data数组惊醒包装 以String格式输出 String response = new String(data,"utf-8"); System.out.println("client :"+response);
//以上代码是接收服务端发来的反馈数据//
ctx.close(); }finally{ // Discard the received data silently. ReferenceCountUtil.release(msg); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{ // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }}



import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;
/** * 服务端业务处理类 * (编写主要的业务逻辑) * @author CaiWan */public class ServerHandler extends ChannelHandlerAdapter{
/** * 每当从客户端收到新的数据时,这个方法会在收到消息时被调用 * ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。 * 请记住处理器的职责是释放所有传递到处理器的引用计数对象。 */ @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ try{ //do something //接收客户端发送的数据 ByteBuf ByteBuf buf = (ByteBuf)msg; //创建一个和buf长度一样的空字节数组 byte[] data = new byte[buf.readableBytes()]; //将buf中的数据读取到data数组中 buf.readBytes(data); //将data数据包装成string输出 String request = new String(data,"utf-8"); System.out.println("server :"+request);
//以上代码是接收客户端信息//
//server端向client发送反馈数据 //如果是绑定了多个端口 那么都会进行发送 ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes())) .addListener(ChannelFutureListener.CLOSE);//添加监听 当服务端向客户端发送完数据后,关闭connect连接 /** * ChannelFutureListener,当一个写请求完成时通知并且关闭Channel * 加上监听 意味着服务端回送数据到客户端时 连接关闭(短连接) * 不加监听 意味着客户端与服务端一直保持连接状态(长连接) */

ctx.close(); }finally{ // Discard the received data silently. ReferenceCountUtil.release(msg); } }
/** * exceptionCaught()事件处理方法是当出现Throwable对象才会被调用 * 即当Netty由于IO错误或者处理器在处理事件时抛出的异常时 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{ // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }
}

Netty笔记(一)Netty笔记(一)Netty笔记(一)Netty笔记(一)

客户端向服务器发送HelloNetty-Server!服务器向客户端返回Hello Netty-Client!

服务端ChannelHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("服务器接收:"+in.toString(CharsetUtil.UTF_8)); in.clear(); String str = "Hello Netty-Client!"; in.writeBytes(str.getBytes()); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); }   @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close();  }}

服务端Server

import java.net.InetSocketAddress;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server { private final int port; public Server(int port) { this.port = port; }
public static void main(String[] args) { int port = 8989; try { new Server(port).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
private void start() throws InterruptedException { final ServerHandler serverHandler = new ServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress( port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync();    }   }}

客户端ChannelHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty-Server!",CharsetUtil.UTF_8)); }
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("客户端接收到消息:"+msg.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close();  }}

客户端Client

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {   private final int port; private final String host;   public Client(int port, String host) { this.port = port; this.host = host;  }
  public static void main(String[] args) { String host = "127.0.0.1"; int port = 8989; try { new Client(port, host).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace();    }  }private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler());            }         }); ChannelFuture future = bootstrap.connect().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync();    }      }}

导入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xm</groupId> <artifactId>netty</artifactId> <version>0.0.1-SNAPSHOT</version>  <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.16.Final</version> </dependency> </dependencies></project>



  1. 参考书籍:Netty实战(Norman Maurer & Marvin Allen Wolfthal)

  2. 参考官方文档:https://netty.io/wiki/user-guide.html


最后

 



以上是关于Netty笔记的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码笔记

netty笔记

学习笔记:python3,代码片段(2017)

Netty学习笔记:Netty核心模块组件

Netty笔记

Netty笔记2-Netty学习之NIO基础