Netty笔记
Posted 菜丸的程序屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty笔记相关的知识,希望对你有一定的参考价值。
字节跳动就是我心中的耶路撒冷!
netty介绍:
Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。
EventLoop
Event
Channel
ChannelPipeline
ChannelHandler
ChannelContextHandler
netty的组件
netty的辅助启动器,netty客户端和服务器的入口,Bootstrap是创建客户端连接的启动器,ServerBootstrap是监听服务端端口的启动器,跟tomcat的Bootstrap类似,程序的入口。
关联jdk原生socket的组件,常用的是NioserverSocketChannel和NioSocketChannel,NioServerSocketChannel负责监听一个tcp端口,有连接进来通过boss reactor创建一个NioSocketChannel将其绑定到worker reactor,然后worker reactor负责这个NioSocketChannel的读写等io事件。
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
A channel provides a user:
* the current state of the channel (e.g. is it open? is it connected?),
* the [configuration parameters](https://netty.io/4.1/api/io/netty/channel/ChannelConfig.html "interface in io.netty.channel") of the channel (e.g. receive buffer size),
* the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
* the [`ChannelPipeline`](https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html "interface in io.netty.channel") which handles all I/O events and requests associated with the channel.
netty最核心的几大组件之一,ChannelHandler的容器,netty处理io操作的通道,与ChannelHandler组成责任链。write、read、connect等所有的io操作都会通过这个ChannelPipeline,依次通过ChannelPipeline上面的ChannelHandler处理,这就是netty事件模型的核心。ChannelPipeline内部有两个节点,head和tail,分别对应着ChannelHandler链的头和尾。
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel.
ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over
how an event is handled and how the ChannelHandlers in a pipeline interact with each other.
使用ChannelHandlerContext在handler之间传递事件
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
Client端的代码实现
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty客户端的程序
* @author CaiWan
*/
public class Client {
/*IP地址*/
static final String HOST = System.getProperty("host", "127.0.0.1");
/*端口号*/
static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
public static void main(String[] args) throws Exception {
EventLoopGroup workgroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();//客户端
b.group(workgroup)
.channel(NioSocketChannel.class)//客户端 -->NioSocketChannel
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {//handler
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
//创建异步连接 可添加多个端口
ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
ChannelFuture cf2 = b.connect(HOST, PORT2).sync();
//buf
//client向server端发送数据 Buffer形式
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes()));
cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));
cf1.channel().closeFuture().sync();
cf2.channel().closeFuture().sync();
workgroup.shutdownGracefully();
}
}
Servler端代码实现:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty实现的服务端程序
* @author CaiWan
*/
public class Server{
/*端口号*/
static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
public static void main(String[] args){
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
ServerBootstrap b = null;
try{
//1:第一个线程组是用于接收Client连接的
bossGroup = new NioEventLoopGroup(); //(1)
//2:第二个线程组是用于实际的业务处理操作的
workerGroup = new NioEventLoopGroup();
//3:创建一个启动NIO服务的辅助启动类ServerBootstrap 就是对我们的Server进行一系列的配置
b = new ServerBootstrap();//(2)
//4:绑定两个线程组
b.group(bossGroup, workerGroup)
//5:需要指定使用NioServerSocketChannel这种类型的通道
.channel(NioServerSocketChannel.class)//(3) 服务端 -->NioServerSocketChannel
//6:一定要使用childHandler 去绑定具体的事件处理器
.childHandler(new ChannelInitializer<SocketChannel>() //(4) childHandler
{
protected void initChannel(SocketChannel sc) throws Exception
{
//7:将自定义的serverHandler加入到管道中去(多个)
sc.pipeline().addLast(new ServerHandler());//handler中实现真正的业务逻辑
// sc.pipeline().addLast(new ServerHandler());
// sc.pipeline().addLast(new ServerHandler());
}
})
/**
* 服务器端TCP内核模块维护两个队列,我们称之为A,B吧
* 客户端向服务端connect的时候,会发送带有SYN标志的包(第一次握手)
* 服务端收到客户端发来的SYN时,向客户端发送SYN ACK确认(第二次握手)
* 此时TCP内核模块把客户端连接加入到A队列中,最后服务端收到客户端发来的ACK时(第三次握手)
* TCP内核模块把客户端连接从A队列移到B队列,连接成功,应用程序的accept会返回
* 也就是说accept从B队列中取出完成三次握手的连接
* A队列和B队列的长度之和是backLog,当A,B队列的长度之和大于backLog时,新连接将会被TCP内核拒绝
* 所以,如果backLog过小,可能会出现accept速度跟不上,A,B队列满了,导致新的客户端无法连接,
* 要注意的是,backLog对程序支持的连接数并无影响,backLog影响的只是还没有被accept取出的连接
*/
//8:设置TCP连接的缓冲区
.option(ChannelOption.SO_BACKLOG, 128)//(5)
// .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲大小
// .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小
//9:保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true);//(6)
//10:绑定指定的端口 进行监听
//此处端口号先写死 也可以绑定多个端口
ChannelFuture cf2= b.bind(PORT1).sync(); // (7)
ChannelFuture cf3= b.bind(PORT2).sync(); // (7) 绑定多个端口
//Thread.sleep(10000);
cf2.channel().closeFuture().sync(); //异步等待关闭
cf3.channel().closeFuture().sync(); //异步等待关闭
}catch(Exception e){
e.printStackTrace();
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
实现数据传输的业务逻辑层代码的实现
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
/**
* 客户端业务处理类
* (编写主要的业务逻辑)
* @author CaiWan
*/
public class ClientHandler extends ChannelHandlerAdapter
{
// ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
// 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try{
//do something
//接收服务端发来的数据 ByteBuf
ByteBuf buf = (ByteBuf)msg;
//创建一个和buf一样长度的字节空数组
byte[] data = new byte[buf.readableBytes()];
//将buf中的数据读取到data数组中
buf.readBytes(data);
//将data数组惊醒包装 以String格式输出
String response = new String(data,"utf-8");
System.out.println("client :"+response);
//以上代码是接收服务端发来的反馈数据//
ctx.close();
}finally{
// Discard the received data silently.
ReferenceCountUtil.release(msg);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
/**
* 服务端业务处理类
* (编写主要的业务逻辑)
* @author CaiWan
*/
public class ServerHandler extends ChannelHandlerAdapter
{
/**
* 每当从客户端收到新的数据时,这个方法会在收到消息时被调用
* ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
* 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
try{
//do something
//接收客户端发送的数据 ByteBuf
ByteBuf buf = (ByteBuf)msg;
//创建一个和buf长度一样的空字节数组
byte[] data = new byte[buf.readableBytes()];
//将buf中的数据读取到data数组中
buf.readBytes(data);
//将data数据包装成string输出
String request = new String(data,"utf-8");
System.out.println("server :"+request);
//以上代码是接收客户端信息//
//server端向client发送反馈数据
//如果是绑定了多个端口 那么都会进行发送
ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
.addListener(ChannelFutureListener.CLOSE);//添加监听 当服务端向客户端发送完数据后,关闭connect连接
/**
* ChannelFutureListener,当一个写请求完成时通知并且关闭Channel
* 加上监听 意味着服务端回送数据到客户端时 连接关闭(短连接)
* 不加监听 意味着客户端与服务端一直保持连接状态(长连接)
*/
ctx.close();
}finally{
// Discard the received data silently.
ReferenceCountUtil.release(msg);
}
}
/**
* exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
* 即当Netty由于IO错误或者处理器在处理事件时抛出的异常时
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
客户端向服务器发送HelloNetty-Server!
服务器向客户端返回Hello Netty-Client!
服务端ChannelHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("服务器接收:"+in.toString(CharsetUtil.UTF_8));
in.clear();
String str = "Hello Netty-Client!";
in.writeBytes(str.getBytes());
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端Server
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
private final int port;
public Server(int port) {
this.port = port;
}
public static void main(String[] args) {
int port = 8989;
try {
new Server(port).start();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void start() throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress( port))
.childHandler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture future = bootstrap.bind().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
客户端ChannelHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty-Server!",CharsetUtil.UTF_8));
}
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("客户端接收到消息:"+msg.toString(CharsetUtil.UTF_8));
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端Client
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
private final int port;
private final String host;
public Client(int port, String host) {
this.port = port;
this.host = host;
}
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 8989;
try {
new Client(port, host).start();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
导入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xm</groupId>
<artifactId>netty</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
</dependencies>
</project>
参考书籍:Netty实战(Norman Maurer & Marvin Allen Wolfthal)
参考官方文档:https://netty.io/wiki/user-guide.html
最后
以上是关于Netty笔记的主要内容,如果未能解决你的问题,请参考以下文章