水文之浅谈Netty线程模型

Posted Huterox

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了水文之浅谈Netty线程模型相关的知识,希望对你有一定的参考价值。

文章目录

前言

看到这个标题,可能有小伙伴要奇怪了,你的TSP三个解法的系列博文去哪了?好吧,我承认我有鸽的成分,今天只是想要水一篇博文(狗头)。但是文章的质量也确实是要保证的,所以,虽然不是这个使用强化学习解决TSP问题的算法,但是咱们今天的内容也确实是比较基本,比较常见的玩意。

那么开始之前咧,我们先随便聊聊阻塞和非阻塞,毕竟就这么来的玩意。

基本概念

那么谈到这个玩意的话,所涉及的玩意无非就这几个,同步异步阻塞,之后是BIO,NIO,AIO

同步

同步异步 , 举个例子来说,一家餐厅吧来了5个客人,同步的意思就是说,来第一个点菜,点了个鱼,好, 厨师去捉鱼杀鱼,过了半小时鱼好了给第一位客人,开始下位一位客人,就这样一个一个来,按顺序来。

异步

异步呢,异步的意思就是来第一位客人,点什么,点鱼,给它一个牌子,让他去一边等吧,下一位客人接着点菜,点完接着点让厨师做去吧,哪个的菜先好就先端出来,

同步的优点是:同步是按照顺序一个一个来,不会乱掉,更不会出现上面代码没有执行完就执行下面的代码, 缺点:是解析的速度没有异步的快;

异步的优点是:异步是接取一个任务,直接给后台,在接下一个任务,一直一直这样,谁的先读取完先执行谁的, 缺点:没有顺序 ,谁先读取完先执行谁的 。

同步其实说白了就是专一,一件事情一件事情去做,老老实实把这件事情做好了才去做现下一件事情,那么异步就是影分身,同时处理多个事情,他可能是使用多线程的方案,也可能是采用轮训的方案,例如海王,他就是属于轮训方案(开个玩笑话,你痛恨海王只是可恶自己不能成为海王(狗头))。

阻塞

说完,那么来说说阻塞是神马玩意,这个其实也简单,先前咱们的同步,异步其实是相对于任务来说的,而咱们的阻塞其实是相对于资源来说的,例如我们的IO资源。我们在这里其实做一个区分的及时,我们的阻塞是对资源的调度,同步,异步是指应用层对资源的使用。例如我们同步获取一个资源,并且必须等待资源加载完毕,那么就叫做同步阻塞,类似于这样,我们拿这个举例子:

for 事件 in 一堆事情:
	通知厨师做菜
	等待菜品做好
	端出去给顾客

for这段是同步的,是在说明调用资源。里面等待菜品做好就是在等待资源,做菜就是对资源进行处理。

非阻塞是这样的:

for 事件 in 一堆事情:
	通知厨师做菜
	告诉顾客在做菜了

之后:

厨师:
	接到通知
	做好菜品
	按下响铃
	送去菜品

也就是非阻塞,他是有一个回调处理的。这个回调是通知程序做好了,资源结束了(IO)
其实用这个图会更好理解:

ok,那么说完了这些之后呢,我们就可以来说说接下来这几个个东西了:
首先是我们的:

BIO

这个东西其实说白了就是这个:

for 事件 in 一堆事情:
	通知厨师做菜
	等待菜品做好
	端出去给顾客

同步调用资源。


不过值得一提的是,在BIO里面其实还有一个伪异步IO(异步阻塞)。其实就是开了一个线程池,实现一对多,类似于影分身,这个和海王最大的区别是,海王只能做到简单聊天的异步,但是这个可以做到视频的异步,也就是影分身,多开,当然对资源的消耗是显而易见的。当然还是对IO操作。

线程池
for 事件 in 一堆事情:
	将事件添加至线程池
线程池执行里面的事件

也就是说:

事件:
	通知厨师做菜
	等待菜品做好
	端出去给顾客

案例(聊天室)

这个的话,比较容易搞混,其实也简单,你就想想多开助手嘛。那么关于这个BIO是有一个案例的,也就是最简单的聊天室的一个案例。那么关于NIO的话如果是一年前,给你手撸NettyLite都可以,现在不行了,事情太多了,后面咱们在WhiteHole开发的时候整合部分就能看到了,Netty实时聊天,消息推送+MQ异步存储(当然MQ不一定有必要,但是LZ连ES都用了再加一个又何妨,做戏做全套嘛)。OK,那么咱们的这个案例的话,是这样的(也是以前的老代码了)

流程

流程是这样的:

消息定义

明确了这个玩意的话,咱们来看看这个消息的定义,毕竟是聊天嘛:

import java.io.Serializable;

public class Message implements Serializable
    private String from;//消息发送者
    private String to;//消息接收者
    private int type;//消息类型
    private String info;//发送的消息内容

    public Message()

    
    public Message(String from, String to, int type, String info) 
        this.from = from;
        this.to = to;
        this.type = type;
        this.info = info;
    
    public String getFrom() 
        return from;
    
    public void setFrom(String from) 
        this.from = from;
    
    public String getTo() 
        return to;
    
    public void setTo(String to) 
        this.to = to;
    
    public int getType() 
        return type;
    
    public void setType(int type) 
        this.type = type;
    
    public String getInfo() 
        return info;
    
    public void setInfo(String info) 
        this.info = info;
    
    



在这里的话还有一个Type定义:

public final class MessageType 
    public static final int TYPE_LOGIN = 0x1;//登录的消息类型
    public static final int TYPE_SEND = 0x2; //发送的消息类型


服务端代码

这个代码就很简陋了,但是能看:


import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Server 
    //服务器端负责数据的中转
    public static void main(String[] args) 
        ExecutorService es = Executors.newFixedThreadPool(6);//线程池大小最多同时支持6个人聊天
        Vector<UserThread> vector = new Vector<>();//存储客户端线程
        try
        ServerSocket server = new ServerSocket(8888);
        System.out.println("服务器已启动......");
        while(true)
            Socket socket = server.accept();
            UserThread user = new UserThread(socket,vector);
            es.execute(user);

        
    catch(IOException e)
        e.printStackTrace();
    
        
    
    


class UserThread implements Runnable
    private Socket socket;
    private Vector<UserThread> vector;
    private String name;
    private ObjectInputStream ois;
    private ObjectOutputStream oos;
    private boolean flag=true;
    public UserThread(Socket socket, Vector<UserThread> vector) 
        this.socket = socket;
        this.vector = vector;
        this.vector.add(this);
    
    public UserThread() 
    
    @Override
    public void run() 
        // TODO Auto-generated method stub
        try
        System.out.println("[Host:"+socket.getInetAddress().getHostAddress()+"-连接]");
        ois = new ObjectInputStream(socket.getInputStream());
        oos = new ObjectOutputStream(socket.getOutputStream());
        while(flag)

            Message msg = (Message) ois.readObject();
            int type = msg.getType();
            switch(type)
                case MessageType.TYPE_LOGIN:
                    //登录
                    this.name = msg.getFrom();
                    msg.setInfo("欢迎你:"+this.name);
                    oos.writeObject(msg);
                    break;
                
                case MessageType.TYPE_SEND:
                    //发送消息
                    String sendto = msg.getTo();
                    for(UserThread user:vector)
                        if(sendto.equals(user.name)&&user!=this)
                            //寻找
                            user.oos.writeObject(msg);
                            System.out.println(this.name+"send message to"+msg.getTo()+"--success");
                            break;
                        
                        else
                            System.out.println(this.name+"send message to"+msg.getTo()+"--fail");
                        
                    
                    break;
                
            
        
        oos.close();
        ois.close();
        catch(IOException | ClassNotFoundException e)
            // e.printStackTrace();
            System.out.println("Closed"+name);
            return;
        

        
    
    



服务端做的事情很简单,首先我们有一个线程池,还有一个定义的用户线程类。当链接过来的时候,这个线程类可以获取到用户的IO流,并且保存链接,也就是这个流不释放(里面有个White),同时将这个线程类装载到咱们的统一管理的列表里面,方便管理,数据互通。

客户端

之后是我们的客户端,同样的,他有两个点,一个是发送消息,一个是时刻监听咱们服务器的消息。

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Client 
    public static void main(String[] args) 
        ExecutorService es = Executors.newSingleThreadExecutor();
        Scanner input  = new Scanner(System.in);
        try
            Socket socket = new Socket("localhost",8888);
            System.out.println("已连接至服务器");
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
            //登录
            System.out.print("请输入用户名:");
            String username = input.nextLine();
            Message msg = new Message(username,null,MessageType.TYPE_LOGIN,null);
            oos.writeObject(msg);
            msg = (Message)ois.readObject();
            System.out.println(msg.getInfo());
            //与其他用户接收发送交流消息
            //读取其他用户的消息
            // new Thread(new ReadInfoThread(ois)).start();
            es.execute(new ReadInfoThread(ois));
            //发送直接在主线程发送
            boolean flag = true;
            while(flag)
                msg = new Message();
                System.out.print("To:");
                msg.setTo(input.nextLine());
                msg.setFrom(username);
                msg.setType(MessageType.TYPE_SEND);
                System.out.print("\\nmsg:");
                msg.setInfo(input.nextLine());
                oos.writeObject(msg);
            
        catch(IOException | ClassNotFoundException e)
            e.printStackTrace();
        
    


class ReadInfoThread implements Runnable
    private ObjectInputStream ois;
    private boolean flag = true;
    public ReadInfoThread(ObjectInputStream ois)
        this.ois = ois;
    
    @Override
    public void run() 
        // TODO Auto-generated method stub
        try 

            while(flag)
            Message msg=(Message)ois.readObject();
            System.out.println();
            System.out.println("From:["+msg.getFrom()+"]say:"+msg.getInfo());
            System.out.print("To:");
            
            if(ois!=null)
                ois.close();
            

         catch (ClassNotFoundException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
         catch (IOException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        
        
    



效果是这样的:

NIO

之后是俺们的NIO了,它是同步非阻塞的。其实这个玩意和咱们刚刚的聊天的玩意最大的区别就是这样的,首先是利用到了咱们JAVA的一个特性,也就是NIO的一个特性Channel。

说白了就是这个

从而代替了线程池,先前咱们用的是Steam,所以需要那个玩意来维持双向数据传输。并且没有双向,我还得等也就是在线程内确定数据传输完毕,也就是通过异步的形式来处理,但是Channel他是非阻塞的读取,之后切换到下一个Channel从而完成处理。

原因大概是这样的:

AIO

这个是在咱们NIO的基础上再做处理,叫做异步非阻塞。NIO实现了非阻塞。但是它是同步的,原因在于和刚刚的例子类似是单线程轮询处理Channel,对资源来说,我们实现了非阻塞,但是实际上读写任务来说,还是同步的。但是:Asynchronous I/O,无论是客户端的连接请求还是读写请求都会异步执行, 由操作系统完成后回调通知服务端程序启动线程去处理, 适用于连接数较多且连接时间较长的应用。

那么我们这里的话就可以使用到这个:Socket通道

举个例子就是这样的:

public class ServerMultiThread 
    public static void main(String[] args) throws Exception 

        ExecutorService executorService = Executors.newCachedThreadPool();
        AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 2);

        final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup)
                .bind(new InetSocketAddress(8000));

        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() 
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) 
                try 
                    serverChannel.accept(attachment, this);
                    System.out.println(socketChannel.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() 
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) 
                            attachment.flip();
                            System.out.println(new String(attachment.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                        

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) 
                            exc.printStackTrace();
                        
                    );
                 catch (IOException e) 
                    e.printStackTrace以上是关于水文之浅谈Netty线程模型的主要内容,如果未能解决你的问题,请参考以下文章

多线程之浅谈线程概念

Js之浅谈dom操作

性能基础之浅谈常见接口性能压测

折腾了我一周,原来Netty网络编程就是这么个破玩意儿!!!

JavaScript之浅谈内存空间

react配置之浅谈