Java 基础 -- NIO 多人聊天室
Posted 周无极
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 基础 -- NIO 多人聊天室相关的知识,希望对你有一定的参考价值。
package com.atguigu.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class GroupChatServer //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器 //初始化工作 public GroupChatServer() try //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将该listenChannel 注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); catch (IOException e) e.printStackTrace(); //监听 public void listen() System.out.println("监听线程: " + Thread.currentThread().getName()); try //循环处理 while (true) int count = selector.select(); if(count > 0) //有事件处理 //遍历得到selectionKey 集合 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) //取出selectionkey SelectionKey key = iterator.next(); //监听到accept if(key.isAcceptable()) SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //将该 sc 注册到seletor sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + " 上线 "); if(key.isReadable()) //通道发送read事件,即通道是可读的状态 //处理读 (专门写方法..) readData(key); //当前的key 删除,防止重复处理 iterator.remove(); else System.out.println("等待...."); catch (Exception e) e.printStackTrace(); finally //发生异常处理.... //读取客户端消息 private void readData(SelectionKey key) //取到关联的channle SocketChannel channel = null; try //得到channel channel = (SocketChannel) key.channel(); //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值做处理 if(count > 0) //把缓存区的数据转成字符串 String msg = new String(buffer.array()); //输出该消息 System.out.println("form 客户端: " + msg); //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理 sendInfoToOtherClients(msg, channel); catch (IOException e) try System.out.println(channel.getRemoteAddress() + " 离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); catch (IOException e2) e2.printStackTrace();; //转发消息给其它客户(通道) private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException System.out.println("服务器转发消息中..."); System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName()); //遍历 所有注册到selector 上的 SocketChannel,并排除 self for(SelectionKey key: selector.keys()) //通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self) //转型 SocketChannel dest = (SocketChannel)targetChannel; //将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer 的数据写入 通道 dest.write(buffer); public static void main(String[] args) //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); //可以写一个Handler class MyHandler public void readData() public void sendInfoToOtherClients()
package com.atguigu.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class GroupChatClient //定义相关的属性 private final String HOST = "127.0.0.1"; // 服务器的ip private final int PORT = 6667; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器, 完成初始化工作 public GroupChatClient() throws IOException selector = Selector.open(); //连接服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel 注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); //向服务器发送消息 public void sendInfo(String info) info = username + " 说:" + info; try socketChannel.write(ByteBuffer.wrap(info.getBytes())); catch (IOException e) e.printStackTrace(); //读取从服务器端回复的消息 public void readInfo() try int readChannels = selector.select(); if(readChannels > 0) //有可以用的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) SelectionKey key = iterator.next(); if(key.isReadable()) //得到相关的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一个Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); iterator.remove(); //删除当前的selectionKey, 防止重复操作 else //System.out.println("没有可以用的通道..."); catch (Exception e) e.printStackTrace(); public static void main(String[] args) throws Exception //启动我们客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程, 每个3秒,读取从服务器发送数据 new Thread() public void run() while (true) chatClient.readInfo(); try Thread.currentThread().sleep(3000); catch (InterruptedException e) e.printStackTrace(); .start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) String s = scanner.nextLine(); chatClient.sendInfo(s);
Java NIO实现的C/S模式多人聊天工具
小弟初学NIO,做了个控制台聊天工具,不知道代码写的如何,望大神们批评指点。服务器端,两个线程,一个处理客户端请求和转发消息,另一个处理服务器管理员指令,上代码:
package kindz.onlinechat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
public class Server {
private static List<SocketChannel> clientList = new LinkedList<SocketChannel>();// 客户端列表
private static Selector clientManager = null;// 通道管理器
private static ServerSocketChannel server = null;// 服务器通道
private static ByteBuffer buff = ByteBuffer.allocate(1500);// 缓冲器
private static int port = 3333;
public static void main(String[] args) {
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
System.out.println("端口号只能为数字");
return;
}
}
try {
// 初始化失败直接退出
if (!init())
return;
while (clientManager.isOpen()) {
select();
// 获取就绪的key列表
Set<SelectionKey> keys = clientManager.selectedKeys();
// 遍历事件并处理
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 判断key是否有效
if (!key.isValid()) {
it.remove();// 要移除
continue;
}
if (key.isAcceptable()) {// 有请求
accept(key);
} else if (key.isReadable()) {// 有数据
broadcast(key);
}
it.remove();
}
}
} catch (ClosedSelectorException | CancelledKeyException e) {// 一定是其他线程关闭了管理器
} finally {
try {
if (clientManager != null)
clientManager.close();
} catch (IOException e) {
}
try {
if (server != null)
server.close();
} catch (IOException e) {
}
closeAll();
System.out.println("服务器已停止");
}
}
// 初始化
private static boolean init() {
System.out.println("服务器启动中...");
try {
// 获取管理器
clientManager = Selector.open();
} catch (IOException e) {
System.out.println("服务器启动失败,原因:通道管理器无法获取");
return false;
}
try {
// 打开通道
server = ServerSocketChannel.open();
} catch (IOException e) {
System.out.println("服务器启动失败,原因:socket通道无法打开");
return false;
}
try {
// 绑定端口
server.socket().bind(new InetSocketAddress(port));
} catch (IOException e) {
System.out.println("服务器启动失败,原因:端口号不可用");
return false;
}
try {
// 设置成非阻塞模式
server.configureBlocking(false);
} catch (IOException e) {
System.out.println("服务器启动失败,原因:非阻塞模式切换失败");
return false;
}
try {
// 注册到管理器中,只监听接受连接事件
server.register(clientManager, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
System.out.println("服务器启动失败,原因:服务器通道已关闭");
return false;
}
Thread service = new Thread(new ServerService(clientManager));// 提供管理员指令服务线程
service.setDaemon(true);// 设置为后台线程
service.start();
System.out.println("服务器启动成功");
return true;
}
// 等待事件
private static void select() {
try {
// 等待事件
clientManager.select();
} catch (IOException e) {
// 忽略未知的异常
}
}
// 此方法获取请求的socket通道并添加到客户端列表中,当然还要注册到管理器中
private static void accept(SelectionKey key) {
SocketChannel socket = null;
try {
// 接受请求的连接
socket = ((ServerSocketChannel) key.channel()).accept();
} catch (IOException e) {// 连接失败
}
if (socket == null)
return;
SocketAddress address = null;
try {
address = socket.getRemoteAddress();
// 注册
socket.configureBlocking(false);
socket.register(clientManager, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {// 注册失败
try {
if (socket != null)
socket.close();
} catch (IOException e1) {
}
return;
} catch (IOException e) {
try {
if (socket != null)
socket.close();
} catch (IOException e1) {
}
return;
}
// 添加到客户端列表中
clientList.add(socket);
System.out.println("主机" + address + "连接到服务器");
}
// 此方法接收数据并发送个客户端列表的每一个人
private static void broadcast(SelectionKey key) {
SocketChannel sender = (SocketChannel) key.channel();
// 方法结束不清理
buff.clear();
int status = -1;
try {
// 读取数据
status = sender.read(buff);
} catch (IOException e) {// 未知的io异常
status = -1;
}
if (status <= 0) {// 异常断开连接,并移除此客户端
remove(sender);
return;
}
// 发送给每一个人
for (SocketChannel client : clientList) {
// 除了他或她自己
if (client == sender)
continue;
buff.flip();
try {
client.write(buff);
} catch (IOException e) {// 发送失败,移除此客户端
remove(client);
}
}
}
private static void remove(SocketChannel client) {
SocketAddress address = null;// 存储主机地址信息
clientList.remove(client);// 从列表中移除
try {
address = client.getRemoteAddress();//获取客户端地址信息
} catch (IOException e1) {
}
try {
client.close();// 关闭连接
} catch (IOException e1) {
}
client.keyFor(clientManager).cancel();// 反注册
System.out.println("与主机" + address + "断开连接");
}
// 关闭列表中全部通道
private static void closeAll() {
for (SocketChannel client : clientList) {
try {
if (client != null)
client.close();
} catch (IOException e) {
}
}
}
}
客户端也是两个线程,一个循环等待接收消息,另一个处理用户输入以及发送:
package kindz.onlinechat;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Client implements Runnable {
private static String ip = null;
private static String name = null;
private static String serverHost="127.0.0.1";//服务器地址
private static int port=3333;//服务器端口号
private static SocketChannel socket = null;// 与服务器连接通道
public static void main(String[] args) {
if(args.length>0){
if (args[0].indexOf(':') == -1) {
System.err.println("目标地址格式不正确");
return;
}
serverHost = args[0].split(":")[0];
try {
port = Integer.parseInt(args[0].split(":")[1]);
} catch (NumberFormatException e) {
System.err.println("端口号只能为数字");
return;
}
}
try {
// 初始化失败退出程序
if (!init())
return;
ByteBuffer buff = ByteBuffer.allocate(1500);// 字节缓冲器
while (socket.read(buff) != -1) {// 读取信息
String msg = new String(buff.array(), 0, buff.position());// 转成字符串
buff.clear();// 清理
System.out.println(msg);
}
} catch (IOException e) {
System.out.println("与服务器断开连接");
} finally {
close();
System.out.println("程序已退出");
}
}
// 输入线程
@SuppressWarnings("resource")
public void run() {
try {
Scanner sc = new Scanner(System.in);
// 循环等待输入
while (sc.hasNextLine()) {
String msg = sc.nextLine();
if (".exit".equals(msg)){
socket.write(ByteBuffer.wrap((name+"-"+ip+"下线了").getBytes()));// 发送下线信息
break;
}
msg = name + "-" + ip + ":" + msg;
socket.write(ByteBuffer.wrap(msg.getBytes()));
}
} catch (IOException e) {
System.out.println("与服务器断开连接");
} finally {
close();
}
}
// 初始化程序
private static boolean init() {
System.out.println("正在连接至服务器...");
try {
socket = SocketChannel
.open(new InetSocketAddress(serverHost, port));// 打开通道
} catch (IOException e) {
System.out.println("无法连接到服务器");
return false;
}
System.out.println("已连接至服务器");
try {
InetAddress address = InetAddress.getLocalHost();// 获取本机网络信息
ip = address.getHostAddress();// 本机ip
name = address.getHostName();// 主机名
socket.write(ByteBuffer.wrap((name+"-"+ip+"上线了").getBytes()));// 发送上线信息
} catch (IOException e) {
System.out.println("网络异常");
return false;
}
Thread thread = new Thread(new Client());
thread.setDaemon(true);// 设置后台线程
thread.start();
return true;
}
// 关闭通道
private static void close() {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
}
}
}
写完感觉还是挺简单的,但是本人一直从事java web开发,异常处理做的比较少,不知道我这个处理的怎么样。
由于没有那么多好基友帮忙测试,我还写了虚拟客户端来模拟好基友:
package kindz.onlinechat;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class VClient implements Runnable {
private String ip = null;
private String name = null;
private String serverHost = "127.0.0.1";// 服务器ip地址
private int port = 3333;// 服务器端口号
private SocketChannel socket = null;// 与服务器连接通道
private static String[] msgs = {
"大家好",
"好困啊",
"今天该干什么啊",
"我这任务好多,先不聊了",
"jQuery是继prototype之后又一个优秀的Javascript库。它是轻量级的js库 ,它兼容CSS3,还兼容各种浏览器(IE 6.0+, FF 1.5+, Safari 2.0+, Opera 9.0+),jQuery2.0及后续版本将不再支持IE6/7/8浏览器。jQuery使用户能更方便地处理HTML(标准通用标记语言下的一个应用)、events、实现动画效果,并且方便地为网站提供AJAX交互。jQuery还有一个比较大的优势是,它的文档说明很全,而且各种应用也说得很详细,同时还有许多成熟的插件可供选择。jQuery能够使用户的html页面保持代码和html内容分离,也就是说,不用再在html里面插入一堆js来调用命令了,只需要定义id即可[8]。",
"谁那有咖啡", "谁有时间帮我去取个快递", ".exit" };
private Random random=new Random();
public VClient(String serverHost, int port) {
this.serverHost = serverHost;
this.port = port;
}
public void run() {
try {
// 初始化失败退出程序
if (!init())
return;
ByteBuffer buff = ByteBuffer.allocate(1500);// 字节缓冲器
while (!Thread.interrupted()&&socket.read(buff) != -1) {// 读取信息,中断退出
String msg = new String(buff.array(), 0, buff.position());// 转成字符串
buff.clear();// 清理
System.out.println(msg);
}
} catch (IOException e) {
System.out.println("与服务器断开连接");
} finally {
close();
System.out.println("程序已退出");
}
}
// 初始化程序
private boolean init() {
System.out.println("正在连接至服务器...");
try {
socket = SocketChannel
.open(new InetSocketAddress(serverHost, port));// 打开通道
} catch (IOException e) {
System.out.println("无法连接到服务器");
return false;
}
System.out.println("已连接至服务器");
try {
InetAddress address = InetAddress.getLocalHost();// 获取本机网络信息
ip = address.getHostAddress();// 本机ip
name = address.getHostName();// 主机名
socket.write(ByteBuffer.wrap((name + "-" + ip + "上线了").getBytes()));// 发送上线信息
} catch (IOException e) {
System.out.println("网络异常");
return false;
}
Thread thread = new Thread(new Daemon());// 私有内部类
thread.setDaemon(true);// 设置后台线程
thread.start();
return true;
}
// 关闭通道
private void close() {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
}
}
// 私有内部类、守护线程、输出用
private class Daemon implements Runnable {
public void run() {
try {
// 自动循环发送消息
while (true) {
String msg = msgs[random.nextInt(msgs.length)];// 随便拿一个写好的信息
if (".exit".equals(msg)) {
socket.write(ByteBuffer.wrap((name + "-" + ip + "下线了")
.getBytes()));// 发送下线信息
break;
}
msg = name + "-" + ip + ":" + msg;
socket.write(ByteBuffer.wrap(msg.getBytes()));
TimeUnit.SECONDS.sleep(random.nextInt(9) + 2);//模拟用户输入过程,2-10秒,平均6秒发一次
}
} catch (IOException e) {
System.out.println("与服务器断开连接");
} catch (InterruptedException e) {
System.out.println("与服务器断开连接");
} finally {
close();
}
}
}
}
虚拟客户端只是个线程,我给它配了个管理器:
package kindz.onlinechat;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class VManager implements Runnable {
private static String serverHost = "127.0.0.1";
private static int port = 3333;
private static int num = 3;// 初始化虚拟客户端数目
private static int min = 5;// 最小虚拟客户端数目
private static int max = 15;// 最大虚拟客户端数目
private static Random random=new Random();
@SuppressWarnings("resource")
public static void main(String[] args) {
if (args.length > 0) {
if (args[0].indexOf(':') == -1) {
System.err.println("目标地址格式不正确");
return;
}
serverHost = args[0].split(":")[0];
try {
port = Integer.parseInt(args[0].split(":")[1]);
} catch (NumberFormatException e) {
System.err.println("端口号只能为数字");
return;
}
}
if (args.length > 1) {
try {
num = Integer.parseInt(args[1]);
} catch (NumberFormatException e) {
System.err.println("初始化数目只能为数字");
return;
}
}
if (args.length > 2) {
try {
min = Integer.parseInt(args[2]);
} catch (NumberFormatException e) {
System.err.println("最小数目只能为数字");
return;
}
}
if (args.length > 3) {
try {
max = Integer.parseInt(args[3]);
} catch (NumberFormatException e) {
System.err.println("最大数目只能为数字");
return;
}
}
if (max < num) {
System.err.println("初始化数量不能大于最大数量");
return;
}
if (max < min) {
System.err.println("最小数量不能大于最大数量");
return;
}
Thread manager = new Thread(new VManager());
manager.start();
Scanner sc = new Scanner(System.in);
String arg = null;// 指令
while (sc.hasNextLine()) {
arg = sc.nextLine();// 输入指令
if ("shutdown".equals(arg)) {
manager.interrupt();
break;
} else {
System.out.println("未知的指令");
}
}
}
public void run() {
ExecutorService manager = Executors.newFixedThreadPool(max);// 线程池管理器
// 初始化几个客户端
for (int i = 0; i < num; i++) {
manager.execute(new VClient(serverHost, port));
}
try {
int interval=(int)(2*48000.0/min)+1;//用户上线间隔时间范围
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(random.nextInt(interval));// 用户大概48秒会下线,尽量保持最少用户个数
manager.execute(new VClient(serverHost, port));
}
} catch (InterruptedException e) {
} finally {
System.out.println("正在停止所有客户端...");
manager.shutdownNow();// 中断所有客户端
}
}
}
虚拟客户端大概平均6秒发一次消息,每次下线的几率是1/8,所以虚拟客户端大概在上线48秒左右的时候会下线,因此想要保证(只能是尽量保证)最小虚拟客户端数量,只要保证48秒内上线固定数量的用户即可,不过在这里,虚拟客户端增加的频率也是随机的,感觉更真实些。
本人想转Java底层的工作,因此在努力学习中,望大神们能够为小弟点出不对的地方。
想学习Java NIO的童鞋也可以借鉴一下我的代码。
愿与CSDN上的Coder们一起在技术的道路上飞奔。
以上是关于Java 基础 -- NIO 多人聊天室的主要内容,如果未能解决你的问题,请参考以下文章
基于Java NIO的多人在线聊天工具源码实现(登录,单聊,群聊)
Day389.使用JavaNIO实现简易在线多人聊天室 -NIO