NIO实现客户端之间通信
Posted 秋天de枫叶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NIO实现客户端之间通信相关的知识,希望对你有一定的参考价值。
NIO被称为New IO,又称NonBlocking IO。他与传统IO的作用与目的是一致,但是在使用方面却有这很大的区别。如图:
NIO的核心分三部分:缓存区,通道、与选择器。
缓冲区:是一个固定数据量的指定基本类型的数据容器。基本 Buffer 类定义了这些属性以及清除、反转 和重绕 等方法进行对缓冲区的操作。(缓存区,byteBufer、LongBuffer、IntBuffer,CharBuffer、ShortBuffer、FloatBuffer、DoubleBuffer)
缓冲区基本属性:容量(capacity)界限(limit)位置(position)标记 (mark)与重置 (reset)
他们之间遵循的关系:0<=mark<=position<=limit<=capacity
缓冲区(Buffer):直接缓冲与间接缓冲区
直接字节缓冲区,是Java 虚拟机直接在此缓冲区上执行本机 I/O操作,减少了之间的copy过程。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
直接与间接缓冲区区别,如图:
通道(channel)表示打开到 IO 设备(例如:文件、套接字)的连接。主要的实现类:
FileChannel:用于读取、写入、映射和操作文件的通道。
DatagramChannel:通过 UDP 读写网络中的数据通道。
SocketChannel:通过 TCP 读写网络中的数据。
ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
对于FileChannel通道的使用。必须经过先创建一个IO流,然后getChannel进行创建,如下:
RandomAccessFile raf = new RandomAccessFile(E:/test.txt,"rw");
FileChannel fchannel=raf.getChannel();
其他通道则可以直接调用Open()方法就行,如下:
ServerSocketChannel serversocket=ServerSocketChannel.open();
SocketChannel socket=SocketChannel.open();
DatagramChannel dc= DatagramChannel.open();
选择器(selector):用于监控通道状态的。
下面通过ServicerSocketChannel,SocketChannel于Selector实现客户端与客户端之间的交互。
首先简历一个服务端,用于转发消息。:
第一步:定义参数
//总服务通道
private ServerSocketChannel serverSocketChannel = null;
//选择器
private Selector selector = null;
//端口
private static int PROT = 4444;
//IP
private static String IP = "127.0.0.1";
//缓冲区
private static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//设置字符集
private static Charset charset = Charset.forName("utf-8");
//存放消息
private static Set<Message> setMsg = new HashSet<>();
//在线通道
private static Map<String, SocketChannel> onlineMap = new HashMap<>();
private Message Msg = null;
第二步:打开通道与与选择器
private void init() throws IOException
//打开选择器
selector = Selector.open();
//打开服务端通道
serverSocketChannel = ServerSocketChannel.open();
//通道设置为非阻塞
serverSocketChannel.configureBlocking(false);
//建立一个套字接
ServerSocket serverSocket = serverSocketChannel.socket();
//绑定IP端口
serverSocket.bind(new InetSocketAddress(IP, PROT));
//注册服务器通道,通道状态接收就绪。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
第三步:监听通道与,处理消息
private void listenServer() throws IOException
System.out.println("服务端启动成功");
while (true)
//当消息不为null
if (setMsg != null && !setMsg.isEmpty())
//获取一条消息
Msg = setMsg.iterator().next();
//删除
setMsg.remove(Msg);
SocketChannel socketChannel = onlineMap.get(Msg.getDest());
registerChannel(socketChannel, SelectionKey.OP_WRITE);
if (selector.select(100) == 0)
continue;
//获取所有选择器
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//循环遍历迭代器
while (it.hasNext())
//获取通道对象所对应的SelectionKey
SelectionKey key = (SelectionKey) it.next();
//防止重复
it.remove();
//如果监听的这个通道对象整处于接收就绪状态
if (key.isAcceptable())
try
System.out.println("=============OP_ACCEPT========");
handleAccept(key);
catch (IOException e)
e.printStackTrace();
if (key.isReadable())
try
System.out.println("=============OP_READ==========");
handleRead(key);
catch (IOException e)
e.printStackTrace();
if (key.isWritable())
try
System.out.println("=============OP_Write==========");
handleWrite(key);
catch (IOException e)
e.printStackTrace();
//接收就绪操作。
private void handleAccept(SelectionKey key) throws IOException
//获取到服务端通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//建立一个套字接通道
SocketChannel socketChannel = serverSocketChannel.accept();
//通道连接
if (socketChannel.isConnected())
//初始化byteBuffer
byteBuffer.clear();
//服务器给客户端返回连接成功消息
byteBuffer.put(new String("系统消息:连接成功...").getBytes());
//设置byteBuffer,position、limit属性
byteBuffer.flip();
//写入通道
socketChannel.write(byteBuffer);
//设置为读就绪操作
registerChannel(socketChannel, SelectionKey.OP_READ);
/**
* 读操作
*
* @param key
*/
private void handleRead(SelectionKey key) throws IOException
String content = "";
//获取到通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//设置一个StringBuffer
StringBuffer strBuffer = new StringBuffer();
//清理缓存区
byteBuffer.clear();
//读取缓冲区数据
while (socketChannel.read(byteBuffer) > 0)
byteBuffer.flip();
while (byteBuffer.hasRemaining())
strBuffer.append(charset.decode(byteBuffer));
content = strBuffer.toString();
System.out.println(content);
//通道状态设置读取
registerChannel(socketChannel, SelectionKey.OP_READ);
//消息转发
if ((content.indexOf("connection server success") != -1))
String[] arr = content.split("\\\\|");
onlineMap.put(arr[0], socketChannel);
else
String[] arr = content.split("\\\\|");
Message msg = new Message(arr[0], arr[1], arr[2], Long.parseLong(String.valueOf(arr[3])));
setMsg.add(msg);
/**
* 写操作
*
* @param key
*/
private void handleWrite(SelectionKey key) throws IOException
//获取到通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//清理缓存区
byteBuffer.clear();
byteBuffer.put((Msg.getUser() + ":" + Msg.getContent()).getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
//清理缓存区
byteBuffer.clear();
//通道状态设置读取
registerChannel(socketChannel, SelectionKey.OP_READ);
private void registerChannel(SocketChannel socketChannel, int ops)
try
if (socketChannel == null)
return;
//设置非阻塞
socketChannel.configureBlocking(false);
//注册通道
socketChannel.register(selector, ops);
catch (IOException e)
e.printStackTrace();
第五步:启动服务:
public static void main(String[] args)
try
NioserverCenter server = new NioServerCenter();
//初始化服务端通道
server.init();
//进行监听服务端通道
server.listenServer();
catch (IOException e)
e.printStackTrace();
客户端Client:
private SocketChannel channel = null;
private Selector selector = null;
private static int PROT = 4444;
private static String IP = "127.0.0.1";
private static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public static Set<Message> msgSet = new HashSet<>();
private Message msg = null;
public static String username = null;
//设置字符集
private static Charset charset = Charset.forName("utf-8");
public void init() throws IOException
//打开选择器
selector = Selector.open();
//打开套字接通道
channel = SocketChannel.open();
//设置非阻塞
channel.configureBlocking(false);
//绑定IP,端口
channel.connect(new InetSocketAddress(IP, PROT));
//注册通道,设置为链接就绪
channel.register(selector, SelectionKey.OP_CONNECT);
public void listenClient() throws IOException
while (true)
if (!msgSet.isEmpty())
//获取一条消息
msg = msgSet.iterator().next();
//从消息集合中移除
msgSet.remove(msg);
//通道标记为写就绪
registerChannel(channel, SelectionKey.OP_WRITE);
if (selector.select(100) == 0)
continue;
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext())
//获取通道对象所对应的SelectionKey
SelectionKey key = (SelectionKey) it.next();
//防止重复
it.remove();
//如果监听的这个通道对象整处于链接就绪状态
if (key.isConnectable())
handleConnection(key);
//读就绪状态
if (key.isReadable())
handleRead(key);
//写就绪状态
if (key.isWritable())
handleWrite(key);
private void handleConnection(SelectionKey key) throws IOException
if (key.isConnectable())
channel = (SocketChannel) key.channel();
if (channel.isConnectionPending())
channel.finishConnect();
byteBuffer.clear();
byteBuffer.put(new Message(username,"client","connection server success",
new Date().getTime()).toString().getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
registerChannel(channel, SelectionKey.OP_READ);
else
//退出
System.exit(-1);
/**
* 读操作
*
* @param key
*/
private void handleRead(SelectionKey key) throws IOException
//获取到通道
channel = (SocketChannel) key.channel();
//设置一个StringBuffer
StringBuffer strBuffer = new StringBuffer();
//清理缓存区
byteBuffer.clear();
//读取缓冲区数据
while (channel.read(byteBuffer) > 0)
byteBuffer.flip();
while (byteBuffer.hasRemaining())
strBuffer.append(charset.decode(byteBuffer));
System.out.println(strBuffer.toString());
registerChannel(channel, SelectionKey.OP_READ);
private void handleWrite(SelectionKey key) throws IOException
//获取到通道
channel = (SocketChannel) key.channel();
//初始化byteBuffer
byteBuffer.clear();
if (channel.isConnected())
if (msg != null)
//服务器给客户端返回连接成功消息
byteBuffer.put(ByteBuffer.wrap(msg.toString().getBytes()));
//设置byteBuffer,position、limit属性
byteBuffer.flip();
//写入通道
channel.write(byteBuffer);
//设置为读操作
byteBuffer.clear();
registerChannel(channel, SelectionKey.OP_READ);
else
System.out.println("客户端,断开连接。。。");
channel.close();
private void registerChannel(SocketChannel socketChannel, int ops)
try
if (socketChannel == null)
return;
//设置非阻塞
socketChannel.configureBlocking(false);
//注册通道
socketChannel.register(selector, ops);
catch (IOException e)
e.printStackTrace();
创建实例1:SXL
public static void main(String[] args)
NioClients nioClient = new NioClients();
new Thread(new Runnable()
@Override
public void run()
Scanner in = null;
boolean flag = true;
String dest = "";
while (true)
//休息一秒
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
if ("".equals(dest))
System.out.print("请输入接收者:");
in = new Scanner(System.in);
dest = in.nextLine();
flag = true;
if (flag)
System.out.print("消息内容:");
flag = false;
in = new Scanner(System.in);
String content = in.nextLine();
long time = new Date().getTime();
NioClients.msgSet.add(new Message("SXL",dest, content, time));
).start();
try
NioClients.username = "SXL";
nioClient.init();
nioClient.listenClient();
catch (IOException e)
e.printStackTrace();
创建实例2:SK
public static void main(String[] args)
NioClients nioClient = new NioClients();
new Thread(new Runnable()
@Override
public void run()
Scanner in = null;
boolean flag = true;
String dest = "";
while (true)
//休息一秒
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
if ("".equals(dest))
System.out.print("请输入接收者:");
in = new Scanner(System.in);
dest = in.nextLine();
flag = true;
if (flag)
System.out.print("消息内容:");
flag = false;
in = new Scanner(System.in);
String content = in.nextLine();
long time = new Date().getTime();
NioClients.msgSet.add(new Message("SK", dest, content, time));
).start();
try
NioClients.username = "SK";
nioClient.init();
nioClient.listenClient();
catch (IOException e)
e.printStackTrace();
这样两个客户端SXL与SK就可以互相通信了。
Demo:https://download.csdn.net/download/xiaolinabc/10774962
以上是关于NIO实现客户端之间通信的主要内容,如果未能解决你的问题,请参考以下文章
网络编程之每天学习一点点[day4]-----nio实现单向通信
JAVA NIO 异步TCP服务端向客户端消息群发代码教程实战