Netty网络编程实战3,使用Netty远程传输文件

Posted 哪 吒

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty网络编程实战3,使用Netty远程传输文件相关的知识,希望对你有一定的参考价值。

目录

一、Netty中的一些常见关键字

1、EventLoopGroup

事件循环组,是一个线程池,也是一个死循环,用于不断地接收用户请求;

2、ServerBootstrap

服务端启动引导类。

3、ChannelFuture

保存Channel异步操作的结果

4、ServerSocketChannel

一个可以监听新进来的TCP连接的通道。

5、group

设置循环组

6、channel

设置服务端的通道实现

7、option

给channel添加配置

8、childHandler

设置业务处理类

9、sync()

直到连接返回,才会继续后面的执行,否则阻塞当前线程

10、ChannelPipeline

可以把ChannelPipeline看成是一个ChandlerHandler的链表,当需要对Channel进行某种处理的时候,Pipeline负责依次调用每一个Handler进行处理。

11、addLast

类似Spring的beanFactory给每个bean起名字

12、ChannelHandlerContext

  1. 先来看一下ChannelHandlerContext的定义:public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
  2. 首先ChannelHandlerContext是一个AttributeMap,可以用来存储多个数据;
  3. 然后ChannelHandlerContext继承了ChannelInboundInvoker和ChannelOutboundInvoker,可以触发inbound和outbound的一些方法;
  4. 除了继承来的一些方法之外,ChannelHandlerContext还可以作为channel,handler和pipline的沟通桥梁,因为可以从ChannelHandlerContext中获取到对应的channel,handler和pipline,Channel channel();ChannelHandler handler();ChannelPipeline pipeline();
  5. ChannelHandlerContext返回一个EventExecutor,用来执行特定的任务;

13、RandomAccessFile

RandomAccessFile是Java 输入/输出流体系中功能最丰富的文件内容访问类,它提供了众多的方法来访问文件内容,它既可以读取文件内容,也可以向文件输出数据。与普通的输入/输出流不同的是,RandomAccessFile支持"随机访问"的方式,程序可以直接跳转到文件的任意地方来读写数据。

14、ObjectDecoder

Object编解码器,Object编解码器是自带防止拆包、粘包功能,Object编解码器能够将对象转成流传输,然后又能将流转成对象,这个原理是因为在编码的时候,它将对象的类名也传输了过去,所以才能做到精准解码成对应的对象。

15、weakCachingConcurrentResolver

使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏。

16、writeAndFlush

写队列并刷新。

17、ChannelOption参数详解

(1)ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数。函数listen(int socketfd, int backlog)用来初始化服务端可连接队列。

服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

(2)ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。

比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。

比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

(3)ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。

当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

(4)ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作发送缓冲区大小和接受缓冲区大小。

接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

(5)ChannelOption.SO_LINGER

ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发送剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。

(6)ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。

Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。

而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

二、服务端

1、实体类

package com.guor.demo.send;

import lombok.Data;

import java.io.File;
import java.io.Serializable;

@Data
public class SendFile implements Serializable 

    private static final long serrialVersionUID = 1L;

    private File file;
    private String fileName;
    private int start;
    private int end;
    private byte[] bytes;

2、主程序类

package com.guor.demo.send;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;

/**
 * 主程序类
 */
public class FileNettyServerTest 
    public static void main(String[] args) throws InterruptedException 
        /**
         * EventLoopGroup:事件循环组,是一个线程池,也是一个死循环,用于不断地接收用户请求;
         * serverGroup:用户监听及建立连接,并把每一个连接抽象为一个channel,最后再将连接交给clientGroup处理;
         * clientGroup:真正的处理连接
         */
        EventLoopGroup serverGroup = new NioEventLoopGroup();
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try 
            // ServerBootstrap:服务端启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 1、将serverGroup和clientGroup注册到服务端的Channel上;
            // 2、注册一个服务端的初始化器MyNettyServerInitializer;
            // 3、该初始化器中的initChannel()方法会在连接被注册到Channel后立刻执行;
            // 5、最后将端口号绑定到8080;
            // ChannelFuture用来保存Channel异步操作的结果
            ChannelFuture channelFuture = serverBootstrap.group(serverGroup, clientGroup)//group用来设置循环组
                    .channel(NioServerSocketChannel.class)//channel用来设置服务端的通道实现,ServerSocketChannel是一个可以监听新进来的TCP连接的通道
                    .option(ChannelOption.SO_BACKLOG,1024)//option用来给channel添加配置
                    // childHandler用来设置业务处理类
                    // sync()的作用是“直到连接返回,才会继续后面的执行,否则阻塞当前线程”
                    .childHandler(new FileNettyServerInitializer()).bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        catch (Exception e)
            System.out.println(e);
        finally 
            serverGroup.shutdownGracefully();
            clientGroup.shutdownGracefully();
        
    

3、自定义初始化器

package com.guor.demo.send;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * 自定义初始化器
 */
public class FileNettyServerInitializer extends ChannelInitializer<SocketChannel> 

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        // 可以把ChannelPipeline看成是一个ChandlerHandler的链表,
        // 当需要对Channel进行某种处理的时候,Pipeline负责依次调用每一个Handler进行处理。
        ChannelPipeline pipeline = socketChannel.pipeline();

        // addLast类似Spring的beanFactory给每个bean起名字
        pipeline.addLast(new ObjectEncoder());
        pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));

        // 增加自定义处理器FileNettyServerHandler,用于实际处理请求,并给出响应
        pipeline.addLast(new FileNettyServerHandler());
    


4、自定义处理器

package com.guor.demo.send;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.File;
import java.io.RandomAccessFile;

/**
 * 自定义处理器
 */
public class FileNettyServerHandler extends SimpleChannelInboundHandler 

    private int readLength;
    private int start;
    private String receivePath = "H:\\\\CSDN\\\\netty\\\\nio";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
        if(msg instanceof SendFile)
            SendFile sendFile = (SendFile)msg;
            byte[] bytes = sendFile.getBytes();
            readLength = sendFile.getEnd();
            String fileName = sendFile.getFileName();
            String path = receivePath + File.separator + fileName;
            File file = new File(path);

            /**
             * RandomAccessFile是Java 输入/输出流体系中功能最丰富的文件内容访问类,它提供了众多的方法来访问文件内容,
             * 它既可以读取文件内容,也可以向文件输出数据。与普通的输入/输出流不同的是,
             * RandomAccessFile支持"随机访问"的方式,程序可以直接跳转到文件的任意地方来读写数据。
             */
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            randomAccessFile.seek(start);
            randomAccessFile.write(bytes);
            start = start + readLength;
            if(readLength > 0)
                ctx.writeAndFlush(start);
                randomAccessFile.close();
            else 
                ctx.flush();
                ctx.close();
            
        
    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
        super.channelInactive(ctx);
        ctx.flush();
        ctx.close();
    


三、客户端

1、主程序类

package com.guor.demo.send;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.File;

/**
 * 主程序类
 */
public class FileNettyClientTest 
    public static void connect(int port, String host, final SendFile fileUploadFile) throws Exception
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try 
            // 客户端启动时的初始化操作
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new FileNettyClientInitializer(fileUploadFile));
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        catch (Exception e)
            System.out.println(e);
        finally 
            eventLoopGroup.shutdownGracefully();
        
    

    public static void main(String[] args) throws Exception 
        int port = 8080;
        if(args != null && args.length > 0)
            port = Integer.valueOf(args[0]);
        
        SendFile sendFile = new SendFile();
        File file = new File("H:\\\\KJ66X\\\\JWFS.rar");
        String fileName = file.getName();
        sendFile.setFile(file);
        sendFile.setFileName(fileName);
        sendFile.setStart(0);
        connect(port,"127.0.0.1",sendFile);
    


2、自定义初始化器

package com.guor.demo.send;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * 自定义初始化器
 */
public class FileNettyClientInitializer extends ChannelInitializer<SocketChannel> 

    SendFile sendFile;

    public FileNettyClientInitializer(SendFile sendFile)
        this.sendFile = sendFile;
    

    @Override
    protected void initChannel(SocketChannel sc) throws Exception 
        ChannelPipeline pipeline = sc.pipeline();
        pipeline.addLast(new ObjectEncoder());
        // ObjectDecoder:Object编解码器
        // 使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,
        // 它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄漏。
        pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
        // 自定义处理器
        pipeline.addLast(new FileNettyClientHandler(sendFile));
    


3、自定义处理器

package com.guor.demo.send;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.IOException;
import java.io.RandomAccessFile;

/**
 * 自定义处理器
 */
public class FileNettyClientHandler extends SimpleChannelInboundHandler 

    private int readLength;
    private int start = 0;
    private int lastLength = 0;
    private RandomAccessFile randomAccessFile;
    private SendFile sendFile;
    public FileNettyClientHandler(SendFile sendFile)
        this.sendFile = sendFile;
    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        super.channelInactive(ctx);
        System.out.println("客户端 - 文件发送完毕");
    

    @Override
    public void channelActive(ChannelHandlerContext ctx) 
        try 
            randomAccessFile = new RandomAccessFile(sendFile.getFile(),"r");
            randomAccessFile.seek(sendFile.getStart());
            lastLength = 1024 * 1024;
            byte[] bytes = new byte[lastLength];
            if((readLength = randomAccessFile.read(bytes)) != -1)
                sendFile.setEnd(readLength);
                sendFile.setBytes(bytes);
                // writeAndFlush: 写队列并刷新
                ctx.writeAndFlush(sendFile);
            
         catch (IOException e) 
            System.out.println(e);
        
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
        if(msg instanceof Netty网络编程实战4,使用Netty实现心跳检测机制

Netty网络编程实战2,使用Netty开发聊天室功能

Netty网络编程实战2,使用Netty开发聊天室功能

Netty实战

实战Netty系列之Netty高性能之道

Day480.Netty手写dubboRPC框架 -netty