水文之浅谈Netty线程模型
Posted Huterox
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了水文之浅谈Netty线程模型相关的知识,希望对你有一定的参考价值。
文章目录
前言
看到这个标题,可能有小伙伴要奇怪了,你的TSP三个解法的系列博文去哪了?好吧,我承认我有鸽的成分,今天只是想要水一篇博文(狗头)。但是文章的质量也确实是要保证的,所以,虽然不是这个使用强化学习解决TSP问题的算法,但是咱们今天的内容也确实是比较基本,比较常见的玩意。
那么开始之前咧,我们先随便聊聊阻塞和非阻塞,毕竟就这么来的玩意。
基本概念
那么谈到这个玩意的话,所涉及的玩意无非就这几个,同步异步阻塞,之后是BIO,NIO,AIO
同步
同步异步 , 举个例子来说,一家餐厅吧来了5个客人,同步的意思就是说,来第一个点菜,点了个鱼,好, 厨师去捉鱼杀鱼,过了半小时鱼好了给第一位客人,开始下位一位客人,就这样一个一个来,按顺序来。
异步
异步呢,异步的意思就是来第一位客人,点什么,点鱼,给它一个牌子,让他去一边等吧,下一位客人接着点菜,点完接着点让厨师做去吧,哪个的菜先好就先端出来,
同步的优点是:同步是按照顺序一个一个来,不会乱掉,更不会出现上面代码没有执行完就执行下面的代码, 缺点:是解析的速度没有异步的快;
异步的优点是:异步是接取一个任务,直接给后台,在接下一个任务,一直一直这样,谁的先读取完先执行谁的, 缺点:没有顺序 ,谁先读取完先执行谁的 。
同步其实说白了就是专一,一件事情一件事情去做,老老实实把这件事情做好了才去做现下一件事情,那么异步就是影分身,同时处理多个事情,他可能是使用多线程的方案,也可能是采用轮训的方案,例如海王,他就是属于轮训方案(开个玩笑话,你痛恨海王只是可恶自己不能成为海王(狗头))。
阻塞
说完,那么来说说阻塞是神马玩意,这个其实也简单,先前咱们的同步,异步其实是相对于任务来说的,而咱们的阻塞其实是相对于资源来说的,例如我们的IO资源。我们在这里其实做一个区分的及时,我们的阻塞是对资源的调度,同步,异步是指应用层对资源的使用。例如我们同步获取一个资源,并且必须等待资源加载完毕,那么就叫做同步阻塞,类似于这样,我们拿这个举例子:
for 事件 in 一堆事情:
通知厨师做菜
等待菜品做好
端出去给顾客
for这段是同步的,是在说明调用资源。里面等待菜品做好就是在等待资源,做菜就是对资源进行处理。
非阻塞是这样的:
for 事件 in 一堆事情:
通知厨师做菜
告诉顾客在做菜了
之后:
厨师:
接到通知
做好菜品
按下响铃
送去菜品
也就是非阻塞,他是有一个回调处理的。这个回调是通知程序做好了,资源结束了(IO)
其实用这个图会更好理解:
ok,那么说完了这些之后呢,我们就可以来说说接下来这几个个东西了:
首先是我们的:
BIO
这个东西其实说白了就是这个:
for 事件 in 一堆事情:
通知厨师做菜
等待菜品做好
端出去给顾客
同步调用资源。
不过值得一提的是,在BIO里面其实还有一个伪异步IO(异步阻塞)。其实就是开了一个线程池,实现一对多,类似于影分身,这个和海王最大的区别是,海王只能做到简单聊天的异步,但是这个可以做到视频的异步,也就是影分身,多开,当然对资源的消耗是显而易见的。当然还是对IO操作。
线程池
for 事件 in 一堆事情:
将事件添加至线程池
线程池执行里面的事件
也就是说:
事件:
通知厨师做菜
等待菜品做好
端出去给顾客
案例(聊天室)
这个的话,比较容易搞混,其实也简单,你就想想多开助手嘛。那么关于这个BIO是有一个案例的,也就是最简单的聊天室的一个案例。那么关于NIO的话如果是一年前,给你手撸NettyLite都可以,现在不行了,事情太多了,后面咱们在WhiteHole开发的时候整合部分就能看到了,Netty实时聊天,消息推送+MQ异步存储(当然MQ不一定有必要,但是LZ连ES都用了再加一个又何妨,做戏做全套嘛)。OK,那么咱们的这个案例的话,是这样的(也是以前的老代码了)
流程
流程是这样的:
消息定义
明确了这个玩意的话,咱们来看看这个消息的定义,毕竟是聊天嘛:
import java.io.Serializable;
public class Message implements Serializable
private String from;//消息发送者
private String to;//消息接收者
private int type;//消息类型
private String info;//发送的消息内容
public Message()
public Message(String from, String to, int type, String info)
this.from = from;
this.to = to;
this.type = type;
this.info = info;
public String getFrom()
return from;
public void setFrom(String from)
this.from = from;
public String getTo()
return to;
public void setTo(String to)
this.to = to;
public int getType()
return type;
public void setType(int type)
this.type = type;
public String getInfo()
return info;
public void setInfo(String info)
this.info = info;
在这里的话还有一个Type定义:
public final class MessageType
public static final int TYPE_LOGIN = 0x1;//登录的消息类型
public static final int TYPE_SEND = 0x2; //发送的消息类型
服务端代码
这个代码就很简陋了,但是能看:
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server
//服务器端负责数据的中转
public static void main(String[] args)
ExecutorService es = Executors.newFixedThreadPool(6);//线程池大小最多同时支持6个人聊天
Vector<UserThread> vector = new Vector<>();//存储客户端线程
try
ServerSocket server = new ServerSocket(8888);
System.out.println("服务器已启动......");
while(true)
Socket socket = server.accept();
UserThread user = new UserThread(socket,vector);
es.execute(user);
catch(IOException e)
e.printStackTrace();
class UserThread implements Runnable
private Socket socket;
private Vector<UserThread> vector;
private String name;
private ObjectInputStream ois;
private ObjectOutputStream oos;
private boolean flag=true;
public UserThread(Socket socket, Vector<UserThread> vector)
this.socket = socket;
this.vector = vector;
this.vector.add(this);
public UserThread()
@Override
public void run()
// TODO Auto-generated method stub
try
System.out.println("[Host:"+socket.getInetAddress().getHostAddress()+"-连接]");
ois = new ObjectInputStream(socket.getInputStream());
oos = new ObjectOutputStream(socket.getOutputStream());
while(flag)
Message msg = (Message) ois.readObject();
int type = msg.getType();
switch(type)
case MessageType.TYPE_LOGIN:
//登录
this.name = msg.getFrom();
msg.setInfo("欢迎你:"+this.name);
oos.writeObject(msg);
break;
case MessageType.TYPE_SEND:
//发送消息
String sendto = msg.getTo();
for(UserThread user:vector)
if(sendto.equals(user.name)&&user!=this)
//寻找
user.oos.writeObject(msg);
System.out.println(this.name+"send message to"+msg.getTo()+"--success");
break;
else
System.out.println(this.name+"send message to"+msg.getTo()+"--fail");
break;
oos.close();
ois.close();
catch(IOException | ClassNotFoundException e)
// e.printStackTrace();
System.out.println("Closed"+name);
return;
服务端做的事情很简单,首先我们有一个线程池,还有一个定义的用户线程类。当链接过来的时候,这个线程类可以获取到用户的IO流,并且保存链接,也就是这个流不释放(里面有个White),同时将这个线程类装载到咱们的统一管理的列表里面,方便管理,数据互通。
客户端
之后是我们的客户端,同样的,他有两个点,一个是发送消息,一个是时刻监听咱们服务器的消息。
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Client
public static void main(String[] args)
ExecutorService es = Executors.newSingleThreadExecutor();
Scanner input = new Scanner(System.in);
try
Socket socket = new Socket("localhost",8888);
System.out.println("已连接至服务器");
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
//登录
System.out.print("请输入用户名:");
String username = input.nextLine();
Message msg = new Message(username,null,MessageType.TYPE_LOGIN,null);
oos.writeObject(msg);
msg = (Message)ois.readObject();
System.out.println(msg.getInfo());
//与其他用户接收发送交流消息
//读取其他用户的消息
// new Thread(new ReadInfoThread(ois)).start();
es.execute(new ReadInfoThread(ois));
//发送直接在主线程发送
boolean flag = true;
while(flag)
msg = new Message();
System.out.print("To:");
msg.setTo(input.nextLine());
msg.setFrom(username);
msg.setType(MessageType.TYPE_SEND);
System.out.print("\\nmsg:");
msg.setInfo(input.nextLine());
oos.writeObject(msg);
catch(IOException | ClassNotFoundException e)
e.printStackTrace();
class ReadInfoThread implements Runnable
private ObjectInputStream ois;
private boolean flag = true;
public ReadInfoThread(ObjectInputStream ois)
this.ois = ois;
@Override
public void run()
// TODO Auto-generated method stub
try
while(flag)
Message msg=(Message)ois.readObject();
System.out.println();
System.out.println("From:["+msg.getFrom()+"]say:"+msg.getInfo());
System.out.print("To:");
if(ois!=null)
ois.close();
catch (ClassNotFoundException e)
// TODO Auto-generated catch block
e.printStackTrace();
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
效果是这样的:
NIO
之后是俺们的NIO了,它是同步非阻塞的。其实这个玩意和咱们刚刚的聊天的玩意最大的区别就是这样的,首先是利用到了咱们JAVA的一个特性,也就是NIO的一个特性Channel。
说白了就是这个
从而代替了线程池,先前咱们用的是Steam,所以需要那个玩意来维持双向数据传输。并且没有双向,我还得等也就是在线程内确定数据传输完毕,也就是通过异步的形式来处理,但是Channel他是非阻塞的读取,之后切换到下一个Channel从而完成处理。
原因大概是这样的:
AIO
这个是在咱们NIO的基础上再做处理,叫做异步非阻塞。NIO实现了非阻塞。但是它是同步的,原因在于和刚刚的例子类似是单线程轮询处理Channel,对资源来说,我们实现了非阻塞,但是实际上读写任务来说,还是同步的。但是:Asynchronous I/O,无论是客户端的连接请求还是读写请求都会异步执行, 由操作系统完成后回调通知服务端程序启动线程去处理, 适用于连接数较多且连接时间较长的应用。
那么我们这里的话就可以使用到这个:Socket通道
举个例子就是这样的:
public class ServerMultiThread
public static void main(String[] args) throws Exception
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 2);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup)
.bind(new InetSocketAddress(8000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>()
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment)
try
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>()
@Override
public void completed(Integer result, ByteBuffer attachment)
attachment.flip();
System.out.println(new String(attachment.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
@Override
public void failed(Throwable exc, ByteBuffer attachment)
exc.printStackTrace();
);
catch (IOException e)
e.printStackTrace以上是关于水文之浅谈Netty线程模型的主要内容,如果未能解决你的问题,请参考以下文章