详解BIOAIONIO Netty 知识点和工作原理

Posted 小吴吃肉啦~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解BIOAIONIO Netty 知识点和工作原理相关的知识,希望对你有一定的参考价值。

Netty框架 基础

三大网络编程

BIO

同步阻塞:服务器实现模式一个连接一个线程,既客户端有连接请求时,服务器就需要启动一个线程进行处理,如果这个连接不任何事情会造成不必要的线程线程开销

适用场景: 连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解

NIO

**同步非阻塞:**服务器实现一个模式为一个线程处理多个请求(连接),即可达发送的连接请求,都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

适用场景:连接数目多连接比较短,eg:聊天服务器,弹幕系,服务期间通讯。

基本介绍:是哟个线程从某个通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就说明都不会获取,而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。

AIO

异步非阻塞:AIO引入异步通道的概念,采用Proactor模式,简化了程序编写,有效的请求才其启动线程,它的特点是先由操作系统完成后才通知服务端程序去处理,一般适用于链接数较多的且连接时间较长的应用。

适用场景: 连接数目 且 链接比较 ,eg: 相册服务器

BIO编程简单流程

  1. 服务器端启动一个 ServiceSocket
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器需要对每个客户 建立一个线程与之通讯
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则还会等待,或者被拒绝
  4. 如果有响应,客户端线程会的等待请求结束后,在继续执行

服务器端:

package com.atguigu.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Bioservice 
    public static void main(String[] args) throws Exception 
        //线程池机制

        //思路
        //1. 创建一个线程池
        //2   如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)

        //1. 创建一个线程池
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

        //创建一个serversocket
        ServerSocket serverSocket = new ServerSocket(6666);

        System.out.println("服务器启动");


        while (true) 

            // 监听,等待客户端连接
            final Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");

            //就创建一个线程,与之通讯(单独写一个方法)
            newCachedThreadPool.execute(new Runnable() 
                public void run() 
                    handler(socket);
                
            );


        

    

    //编写一个handler方法,和客户端通讯
    public static void handler(Socket socket) 

        try 
            System.out.println("线程信息 id = " + Thread.currentThread().getId() +
                    "线程名字 =" + Thread.currentThread().getName());

            byte[] bytes = new byte[1024];
            //通过socker 获取输入流
            InputStream inputStream = socket.getInputStream();

            //循环的读取客户端的发送的数据
            while (true) 
                System.out.println("线程信息 id = " + Thread.currentThread().getId() +
                        "线程名字 =" + Thread.currentThread().getName());
                int read = inputStream.read(bytes);
                if (read != -1) 
                    System.out.print(new String(bytes, 0, read));//输出客户端发送的数据
                 else 
                    break;
                
            


         catch (IOException e) 
            e.printStackTrace();
         finally 
            System.out.println("关闭和client连接");

            try 
                socket.close();
             catch (IOException e) 
                e.printStackTrace();
            
        
    



客户端: cmd telnet 127.0.0.1 6666

NIO核心三大核心

NIO三大核心部分 : Channnel(通道) Buffer(缓冲区) Selector(选着器)

三者关系说明

  1. 每个channel 都会对应一个Buffer
  2. Selector对对应一个线程,一个线程可以对应多个channel
  3. 该图反应了有三个channel注册到 该selector
  4. 程序切换到哪个channel是有时间决定的,Event就是一个重要的概念
  5. Selector会根据不同的事件,在各个通道上切换
  6. Buffer就是一个内存块,底层是有一个数组
  7. 数据的读取写入是通过Buffer,这个和BIO,BIO中要么是输入流,或者是输出流 不能双向,但是NIO的Buffer 是可以读也可以写,需要flip方法的切换
  8. channel是双向的,可以返回底层操作系统的情况、

Buffer(缓冲区)

buffer子类(API)

  1. ByteBuffer,存储字节数据到缓冲区
  2. ShortBuffer,存储字符串数据到缓冲区
  3. CharBuffer,存储字符数据到缓冲区
  4. IntBuffer,存储整数数据到缓冲区
  5. LongBuffer,存储长整型数据到缓冲区
  6. DoubleBuffer,存储小数到缓冲区
  7. FloatBuffer,存储小数到缓冲区

Buffer支持类型化的put 和 get,put放入什么数据类型,get就应该使用响应的数据类型来取出,否则可嫩硅油 BufferUnderflowException 异常

buffer属性

属性描述
Capacity容量,即可以容纳的最大数量,在缓冲区创建时被设定并且不能改变
Limit表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Postion位置,下一个要被读写的元素的索引,每次读写缓冲区数据都会被改变数值,为下一次的读写准备
Mark标记

Channel(通道)

通道可以同时进行读写,而流只能读or之后hi能写

通道可以实现异步读写数据

通道可以从缓存读数据,也可以写数据到缓存

基本介绍:

  1. BIO 中的stream似乎单向的,例如FileInputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作
  2. Channel在NIO中是一个接口
  3. 常用的Channel 类有: FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel
  4. FileChannel 用于文件数据的读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和SocketChannel 用于TCP的数据读写。

FileChannel类

​ FileChannel主要用来对本地文件进行IO操作,常见的方法有:

public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区

public int write(ByteBuffer src),把缓冲区的数据写到通道中

public long transferFrom(ReadableByteChannel src,long position,long count),从目标通道中复制数据到当前通道

public long tansferTo(long postion,long count,WritableByteChannel target),吧数据从当前通道复制给目标通道 

write案例

package com.atguigu.nio;

import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NIOFileChannel01 
    public static void main(String[] args) throws Exception
        String str = "四达时代";

        //创建一个输出流
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\\\file01.txt");

        //通过fileOutputStream 获取 对应FileChannel
        // 这个fileChannel 真实类型是 FileChannelImpl
        FileChannel fileChannel = fileOutputStream.getChannel();

        //创建一个缓冲区  ByteBuffer  再分配1024空间
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //将 str 放入 Buffer
        byteBuffer.put(str.getBytes());

        //对buffer ,进行flip   功能反转
        byteBuffer.flip();

        //把缓冲区的数据 写入 FileChannel中
        fileChannel.write(byteBuffer);

        //关闭流
        fileOutputStream.close();
    


read案例

package com.atguigu.nio;

import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NIOFileChannel02 
    public static void main(String[] args) throws Exception 

        File file = new File("D:\\\\file01.txt");
        FileInputStream fileInputStream = new FileInputStream(file);

        //通过fileInputStream 获取对应的ileChannel
        FileChannel channel = fileInputStream.getChannel();

        //创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

        //将 通道的数据 读入到Buffer
        channel.read(byteBuffer);

        // 将字节转出 字符串
        System.out.println(new String(byteBuffer.array()));

        fileInputStream.close();
    


transferFrom案例

Selector(选择器)

Java的NIO,用非阻塞的IO方式,可以使用一个线程,处理多个的客户端链接,就会使用Selector(选择器)

Selector 能够检测多个组测的通道上是否有事件发生(注意:多个channel以事件的方式可以注册到同意哦个Selector),如果有事件发生,便获取事件然后针对每个事件进行响应的处理。这样就可以只用一个单线程曲棍里多个通道,也就是管理多个连接和请求。

只有在链接真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必每个连接都创建一个线程

避免了多线程之间的上下文切换导致额开销。

Selector 方法说明

监控所有注册的通道,当其中有io流

方法说明
open()得到一个选择器对象
selcet(long timeout)阻塞 xxxx时间,在xxxx毫秒后返回
select阻塞
wakeup()唤醒selector
selectorNow()不阻塞,立马返还

Seletor、SelectionKey、ServerScokerChannel和SocketChannle 关系图梳理

下图说明

  1. 当客户端连接时,会通过ServerSocketChannel 得到SocketChannel

  2. Seletor进行监听,select方法,返回有事件发生的通道个数

  3. 将socketChannel注册到Selector上,register(Selector sel,int ops)方法

    ,一个selector上可以注册多个SocketChannel

    register(Selector sel,int ops) Seletor 是对应的seletor ,ops 是xxx事件

  4. 注册后返回一个SeletionKey,会和该Seletor 关联(集合)

  5. Seletor 进行监听 select 方法,返回有事件发生的通道的个数

  6. 进一步得到各个SelectionKey(有事件发生)

  7. 在通过SelectionKey 反向获取SocketChannel,方法channel()

  8. 可以得到channel

Netty

Reactor

Reactor模式

解释:

  1. Reator模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
  2. 服务器端程序处理传入的多个请求,并将他们同步分派到相应的处理线程
  3. Reacctor模式适用IO复用监听事件,收到事件后,分发给某个线程(进程),这个点就是网络服务器高并发处理关键

Reactor模式中核心组成:

  1. Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就向公司的电话接线员,他接听来自客户的电话并将线路转移到适当的联系人。
  2. Handlers:处理程序执行IO事件完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序响应的IO事件,处理程序执行非阻塞操作。

Reactor模式分类

  1. 单Reactor单线程
  2. 单Reactor多线程
  3. 主从Reactor多线程

单Reactor单线程

优点:模型简单、没有多线程、进程通信、竞争的问题,全部在一个线程中完成

缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能,

单Reactor多线程

说明:

  1. Reactor对象通过selelct监控客户端请求事件,收到事件后,通过dispatch进行分发
  2. 如果建立连接请求,然后创建一个Handler 对象处理完成连接后的各种事件
  3. 如果不是连接请求,则由reactor分发调用连接对应的handler来处理
  4. handler只负责响应事件,不做具体的业务处理,通过read读取数据会分发worker线程池的某个蓄电池去处理业务
  5. worker线程池会分配独立线程完成真正的业务,并将结果返回会给handler
  6. handler收到响应后,通过send将结果返回给client

优点:充分利用多核CPU的处理

缺点:多线程数据共享,访问比较复杂,reactor处理所有事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈

主从Reactor多线程

**工作原理:**针单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行

说明:

  1. Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
  2. 当Acceptor 处理连接事件后,MainReactor 将 连接分配给SubReactor
  3. subreactor 将连接加入到连接队列中进行监听,并创建handler进行各种事件处理
  4. 当有新的事件发生时,subreactoer 调用对应的handler处理
  5. handler通过read读取数据,分发给后面的worker线程处理
  6. worker线程池分配独立的wordler线程进行业务处理,并返回结果
  7. handler 收到响应的结果后,再通过send将结果返回给client
  8. Reactor主线程可以对应多个Reactor子线程

优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理

​ 父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据

缺点:编程复杂的较高

Reactor总结

  1. 响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程、进程的切换开销
  2. 扩张性好,可以方便的通过增加Reactor实例格式来充分利用CPU资源
  3. 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性

Netty模型

简单版

说明:

  1. Boss线程维护Selector,只会关注Accecpt
  2. 当接受到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护
  3. 当Worker线程监听到selector中通道发生自己感兴趣二点事件后,就进行处理(就由handler),注意handler 已经加入到通道

进阶版

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y0Nb2Dap-1668606912814)(D:/Typora/%E7%AC%94%E8%AE%B0%E5%9B%BE/)]

详细版

说明:

  1. netty抽象出俩组线程池 BossGroup 专门负责接受客户端连接、WorkerGroup 专门负责网络的读写
  2. BossGroup 和 WorkerGroup 类型 都是NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组,这个组合含有多个事件循环,每一个事件循环时NioEventLoop
  4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通信
  5. NioEventLoopGroup 可以有多个线程,既可以含有多个NioEventLoop
  6. 每个BossNioEventLoop执行的步骤有3步
    1. 轮询accept事件
    2. 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioWEventLoop上的select
    3. 处理任务队列的任务,即runAllTasks
  7. 每个Worker NIOEventLoop循环执行的的步骤
    1. 轮询read、write 事件
    2. 处理io事件,即read、write事件,在对应NioSocketChannel处理
    3. 处理任务队列的任务,即runAllTasks
  8. 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器

上述流程代码

Netty客户端,

package com.atguigu.netty.simple;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClient 

    public static void main(String[] args) throws Exception 
        //客户端只需要一个事件循环组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try 
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootSTRAP  而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) //设置看客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() 

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception 
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                        
                    );

            System.out.println("客户端 OK");

            //启动客户端 去连接服务器端
            //关于
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
         finally 
            group.shutdownGracefully();
        
    


Netty客户端Handler (会被客户端调用)

package com.atguigu.netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter 


    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务器", CharsetUtil.UTF_8));
    

    //当通道有读取事件时,会触发
    /**
     * @param ctx 上下文对象,含有管道 pipeline (管道里面含有好多handler) ,通道 channel(通道注重读和写), 地址
     * @param msg 就是客户端发送的数据  ,默认是Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("server ctx =" + ctx);
        //将msg 转换ByteBuf
        //ByteBuf 是 Netty 提供的 , 不是NIO 的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器端地址:" + ctx.channel().remoteAddress());
    

    //处理异常.一般需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    


Netty服务器端,

package com.atguigu.netty.simple;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
//服务端1
public以上是关于详解BIOAIONIO Netty 知识点和工作原理的主要内容,如果未能解决你的问题,请参考以下文章

Pipeline的入站流程详解(netty源码死磕7)

如何使用Netty技术设计一个百万级的消息推送系统 原 荐

MQTT---HiveMQ源码详解Netty-MQTT消息事件处理(流程)

Netty详解ByteBuf

Netty基础必备知识,ByteBuffer和ByteBuf底层原理

Netty基础必备知识,ByteBuffer和ByteBuf底层原理