Netty网络编程实战3,使用Netty远程传输文件
Posted 哪 吒
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty网络编程实战3,使用Netty远程传输文件相关的知识,希望对你有一定的参考价值。
目录
一、Netty中的一些常见关键字
1、EventLoopGroup
事件循环组,是一个线程池,也是一个死循环,用于不断地接收用户请求;
2、ServerBootstrap
服务端启动引导类。
3、ChannelFuture
保存Channel异步操作的结果
4、ServerSocketChannel
一个可以监听新进来的TCP连接的通道。
5、group
设置循环组
6、channel
设置服务端的通道实现
7、option
给channel添加配置
8、childHandler
设置业务处理类
9、sync()
直到连接返回,才会继续后面的执行,否则阻塞当前线程
10、ChannelPipeline
可以把ChannelPipeline看成是一个ChandlerHandler的链表,当需要对Channel进行某种处理的时候,Pipeline负责依次调用每一个Handler进行处理。
11、addLast
类似Spring的beanFactory给每个bean起名字
12、ChannelHandlerContext
- 先来看一下ChannelHandlerContext的定义:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
; - 首先ChannelHandlerContext是一个AttributeMap,可以用来存储多个数据;
- 然后ChannelHandlerContext继承了ChannelInboundInvoker和ChannelOutboundInvoker,可以触发inbound和outbound的一些方法;
- 除了继承来的一些方法之外,ChannelHandlerContext还可以作为channel,handler和pipline的沟通桥梁,因为可以从ChannelHandlerContext中获取到对应的channel,handler和pipline,
Channel channel();ChannelHandler handler();ChannelPipeline pipeline();
; - ChannelHandlerContext返回一个EventExecutor,用来执行特定的任务;
13、RandomAccessFile
RandomAccessFile是Java 输入/输出流体系中功能最丰富的文件内容访问类,它提供了众多的方法来访问文件内容,它既可以读取文件内容,也可以向文件输出数据。与普通的输入/输出流不同的是,RandomAccessFile支持"随机访问"的方式,程序可以直接跳转到文件的任意地方来读写数据。
14、ObjectDecoder
Object编解码器,Object编解码器是自带防止拆包、粘包功能,Object编解码器能够将对象转成流传输,然后又能将流转成对象,这个原理是因为在编码的时候,它将对象的类名也传输了过去,所以才能做到精准解码成对应的对象。
15、weakCachingConcurrentResolver
使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏。
16、writeAndFlush
写队列并刷新。
17、ChannelOption参数详解
(1)ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数。函数listen(int socketfd, int backlog)用来初始化服务端可连接队列。
服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。
(2)ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。
比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。
比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。
(3)ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
(4)ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作发送缓冲区大小和接受缓冲区大小。
接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
(5)ChannelOption.SO_LINGER
ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发送剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。
(6)ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
二、服务端
1、实体类
package com.guor.demo.send;
import lombok.Data;
import java.io.File;
import java.io.Serializable;
@Data
public class SendFile implements Serializable
private static final long serrialVersionUID = 1L;
private File file;
private String fileName;
private int start;
private int end;
private byte[] bytes;
2、主程序类
package com.guor.demo.send;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;
/**
* 主程序类
*/
public class FileNettyServerTest
public static void main(String[] args) throws InterruptedException
/**
* EventLoopGroup:事件循环组,是一个线程池,也是一个死循环,用于不断地接收用户请求;
* serverGroup:用户监听及建立连接,并把每一个连接抽象为一个channel,最后再将连接交给clientGroup处理;
* clientGroup:真正的处理连接
*/
EventLoopGroup serverGroup = new NioEventLoopGroup();
EventLoopGroup clientGroup = new NioEventLoopGroup();
try
// ServerBootstrap:服务端启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 1、将serverGroup和clientGroup注册到服务端的Channel上;
// 2、注册一个服务端的初始化器MyNettyServerInitializer;
// 3、该初始化器中的initChannel()方法会在连接被注册到Channel后立刻执行;
// 5、最后将端口号绑定到8080;
// ChannelFuture用来保存Channel异步操作的结果
ChannelFuture channelFuture = serverBootstrap.group(serverGroup, clientGroup)//group用来设置循环组
.channel(NioServerSocketChannel.class)//channel用来设置服务端的通道实现,ServerSocketChannel是一个可以监听新进来的TCP连接的通道
.option(ChannelOption.SO_BACKLOG,1024)//option用来给channel添加配置
// childHandler用来设置业务处理类
// sync()的作用是“直到连接返回,才会继续后面的执行,否则阻塞当前线程”
.childHandler(new FileNettyServerInitializer()).bind(8080).sync();
channelFuture.channel().closeFuture().sync();
catch (Exception e)
System.out.println(e);
finally
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
3、自定义初始化器
package com.guor.demo.send;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* 自定义初始化器
*/
public class FileNettyServerInitializer extends ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
// 可以把ChannelPipeline看成是一个ChandlerHandler的链表,
// 当需要对Channel进行某种处理的时候,Pipeline负责依次调用每一个Handler进行处理。
ChannelPipeline pipeline = socketChannel.pipeline();
// addLast类似Spring的beanFactory给每个bean起名字
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));
// 增加自定义处理器FileNettyServerHandler,用于实际处理请求,并给出响应
pipeline.addLast(new FileNettyServerHandler());
4、自定义处理器
package com.guor.demo.send;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.File;
import java.io.RandomAccessFile;
/**
* 自定义处理器
*/
public class FileNettyServerHandler extends SimpleChannelInboundHandler
private int readLength;
private int start;
private String receivePath = "H:\\\\CSDN\\\\netty\\\\nio";
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
if(msg instanceof SendFile)
SendFile sendFile = (SendFile)msg;
byte[] bytes = sendFile.getBytes();
readLength = sendFile.getEnd();
String fileName = sendFile.getFileName();
String path = receivePath + File.separator + fileName;
File file = new File(path);
/**
* RandomAccessFile是Java 输入/输出流体系中功能最丰富的文件内容访问类,它提供了众多的方法来访问文件内容,
* 它既可以读取文件内容,也可以向文件输出数据。与普通的输入/输出流不同的是,
* RandomAccessFile支持"随机访问"的方式,程序可以直接跳转到文件的任意地方来读写数据。
*/
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.seek(start);
randomAccessFile.write(bytes);
start = start + readLength;
if(readLength > 0)
ctx.writeAndFlush(start);
randomAccessFile.close();
else
ctx.flush();
ctx.close();
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
super.channelInactive(ctx);
ctx.flush();
ctx.close();
三、客户端
1、主程序类
package com.guor.demo.send;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File;
/**
* 主程序类
*/
public class FileNettyClientTest
public static void connect(int port, String host, final SendFile fileUploadFile) throws Exception
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try
// 客户端启动时的初始化操作
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new FileNettyClientInitializer(fileUploadFile));
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
catch (Exception e)
System.out.println(e);
finally
eventLoopGroup.shutdownGracefully();
public static void main(String[] args) throws Exception
int port = 8080;
if(args != null && args.length > 0)
port = Integer.valueOf(args[0]);
SendFile sendFile = new SendFile();
File file = new File("H:\\\\KJ66X\\\\JWFS.rar");
String fileName = file.getName();
sendFile.setFile(file);
sendFile.setFileName(fileName);
sendFile.setStart(0);
connect(port,"127.0.0.1",sendFile);
2、自定义初始化器
package com.guor.demo.send;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* 自定义初始化器
*/
public class FileNettyClientInitializer extends ChannelInitializer<SocketChannel>
SendFile sendFile;
public FileNettyClientInitializer(SendFile sendFile)
this.sendFile = sendFile;
@Override
protected void initChannel(SocketChannel sc) throws Exception
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new ObjectEncoder());
// ObjectDecoder:Object编解码器
// 使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,
// 它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏。
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
// 自定义处理器
pipeline.addLast(new FileNettyClientHandler(sendFile));
3、自定义处理器
package com.guor.demo.send;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* 自定义处理器
*/
public class FileNettyClientHandler extends SimpleChannelInboundHandler
private int readLength;
private int start = 0;
private int lastLength = 0;
private RandomAccessFile randomAccessFile;
private SendFile sendFile;
public FileNettyClientHandler(SendFile sendFile)
this.sendFile = sendFile;
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
super.channelInactive(ctx);
System.out.println("客户端 - 文件发送完毕");
@Override
public void channelActive(ChannelHandlerContext ctx)
try
randomAccessFile = new RandomAccessFile(sendFile.getFile(),"r");
randomAccessFile.seek(sendFile.getStart());
lastLength = 1024 * 1024;
byte[] bytes = new byte[lastLength];
if((readLength = randomAccessFile.read(bytes)) != -1)
sendFile.setEnd(readLength);
sendFile.setBytes(bytes);
// writeAndFlush: 写队列并刷新
ctx.writeAndFlush(sendFile);
catch (IOException e)
System.out.println(e);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
if(msg instanceof Netty网络编程实战4,使用Netty实现心跳检测机制