Java 基础 -- NIO 多人聊天室

Posted 周无极

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 基础 -- NIO 多人聊天室相关的知识,希望对你有一定的参考价值。

package com.atguigu.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class GroupChatServer 
    //定义属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    //构造器
    //初始化工作
    public GroupChatServer() 

        try 

            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel =  ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            //将该listenChannel 注册到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);

        catch (IOException e) 
            e.printStackTrace();
        

    

    //监听
    public void listen() 

        System.out.println("监听线程: " + Thread.currentThread().getName());
        try 

            //循环处理
            while (true) 

                int count = selector.select();
                if(count > 0) //有事件处理

                    //遍历得到selectionKey 集合
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) 
                        //取出selectionkey
                        SelectionKey key = iterator.next();

                        //监听到accept
                        if(key.isAcceptable()) 
                            SocketChannel sc = listenChannel.accept();
                            sc.configureBlocking(false);
                            //将该 sc 注册到seletor
                            sc.register(selector, SelectionKey.OP_READ);

                            //提示
                            System.out.println(sc.getRemoteAddress() + " 上线 ");

                        
                        if(key.isReadable())  //通道发送read事件,即通道是可读的状态
                            //处理读 (专门写方法..)

                            readData(key);

                        
                        //当前的key 删除,防止重复处理
                        iterator.remove();
                    

                 else 
                    System.out.println("等待....");
                
            

        catch (Exception e) 
            e.printStackTrace();

        finally 
            //发生异常处理....

        
    

    //读取客户端消息
    private void readData(SelectionKey key) 

        //取到关联的channle
        SocketChannel channel = null;

        try 
           //得到channel
            channel = (SocketChannel) key.channel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            int count = channel.read(buffer);
            //根据count的值做处理
            if(count > 0) 
                //把缓存区的数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("form 客户端: " + msg);

                //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                sendInfoToOtherClients(msg, channel);
            

        catch (IOException e) 
            try 
                System.out.println(channel.getRemoteAddress() + " 离线了..");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            catch (IOException e2) 
                e2.printStackTrace();;
            
        
    

    //转发消息给其它客户(通道)
    private void sendInfoToOtherClients(String msg, SocketChannel self ) throws  IOException

        System.out.println("服务器转发消息中...");
        System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());
        //遍历 所有注册到selector 上的 SocketChannel,并排除 self
        for(SelectionKey key: selector.keys()) 

            //通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();

            //排除自己
            if(targetChannel instanceof  SocketChannel && targetChannel != self) 

                //转型
                SocketChannel dest = (SocketChannel)targetChannel;
                //将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //将buffer 的数据写入 通道
                dest.write(buffer);
            
        

    

    public static void main(String[] args) 

        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    


//可以写一个Handler
class MyHandler 
    public void readData() 

    
    public void sendInfoToOtherClients()

    
package com.atguigu.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class GroupChatClient 

    //定义相关的属性
    private final String HOST = "127.0.0.1"; // 服务器的ip
    private final int PORT = 6667; //服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    //构造器, 完成初始化工作
    public GroupChatClient() throws IOException 

        selector = Selector.open();
        //连接服务器
        socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将channel 注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到username
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username + " is ok...");

    

    //向服务器发送消息
    public void sendInfo(String info) 

        info = username + " 说:" + info;

        try 
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        catch (IOException e) 
            e.printStackTrace();
        
    

    //读取从服务器端回复的消息
    public void readInfo() 

        try 

            int readChannels = selector.select();
            if(readChannels > 0) //有可以用的通道

                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) 

                    SelectionKey key = iterator.next();
                    if(key.isReadable()) 
                        //得到相关的通道
                       SocketChannel sc = (SocketChannel) key.channel();
                       //得到一个Buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        sc.read(buffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    
                
                iterator.remove(); //删除当前的selectionKey, 防止重复操作
             else 
                //System.out.println("没有可以用的通道...");

            

        catch (Exception e) 
            e.printStackTrace();
        
    

    public static void main(String[] args) throws Exception 

        //启动我们客户端
        GroupChatClient chatClient = new GroupChatClient();

        //启动一个线程, 每个3秒,读取从服务器发送数据
        new Thread() 
            public void run() 

                while (true) 
                    chatClient.readInfo();
                    try 
                        Thread.currentThread().sleep(3000);
                    catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
            
        .start();

        //发送数据给服务器端
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNextLine()) 
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        
    


 

Java NIO实现的C/S模式多人聊天工具

小弟初学NIO,做了个控制台聊天工具,不知道代码写的如何,望大神们批评指点。

服务器端,两个线程,一个处理客户端请求和转发消息,另一个处理服务器管理员指令,上代码:

package kindz.onlinechat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

public class Server {

    private static List<SocketChannel> clientList = new LinkedList<SocketChannel>();// 客户端列表
    private static Selector clientManager = null;// 通道管理器
    private static ServerSocketChannel server = null;// 服务器通道
    private static ByteBuffer buff = ByteBuffer.allocate(1500);// 缓冲器
    private static int port = 3333;

    public static void main(String[] args) {
	if (args.length > 0) {
	    try {
		port = Integer.parseInt(args[0]);
	    } catch (NumberFormatException e) {
		System.out.println("端口号只能为数字");
		return;
	    }
	}

	try {
	    // 初始化失败直接退出
	    if (!init())
		return;

	    while (clientManager.isOpen()) {
		select();

		// 获取就绪的key列表
		Set<SelectionKey> keys = clientManager.selectedKeys();

		// 遍历事件并处理
		Iterator<SelectionKey> it = keys.iterator();
		while (it.hasNext()) {
		    SelectionKey key = it.next();
		    // 判断key是否有效
		    if (!key.isValid()) {
			it.remove();// 要移除
			continue;
		    }

		    if (key.isAcceptable()) {// 有请求
			accept(key);
		    } else if (key.isReadable()) {// 有数据
			broadcast(key);
		    }

		    it.remove();
		}
	    }
	} catch (ClosedSelectorException | CancelledKeyException e) {// 一定是其他线程关闭了管理器
	} finally {
	    try {
		if (clientManager != null)
		    clientManager.close();
	    } catch (IOException e) {
	    }

	    try {
		if (server != null)
		    server.close();
	    } catch (IOException e) {
	    }

	    closeAll();
	    System.out.println("服务器已停止");
	}
    }

    // 初始化

    private static boolean init() {
	System.out.println("服务器启动中...");

	try {
	    // 获取管理器
	    clientManager = Selector.open();
	} catch (IOException e) {
	    System.out.println("服务器启动失败,原因:通道管理器无法获取");
	    return false;
	}

	try {
	    // 打开通道
	    server = ServerSocketChannel.open();
	} catch (IOException e) {
	    System.out.println("服务器启动失败,原因:socket通道无法打开");
	    return false;
	}

	try {
	    // 绑定端口
	    server.socket().bind(new InetSocketAddress(port));
	} catch (IOException e) {
	    System.out.println("服务器启动失败,原因:端口号不可用");
	    return false;
	}

	try {
	    // 设置成非阻塞模式
	    server.configureBlocking(false);
	} catch (IOException e) {
	    System.out.println("服务器启动失败,原因:非阻塞模式切换失败");
	    return false;
	}

	try {
	    // 注册到管理器中,只监听接受连接事件
	    server.register(clientManager, SelectionKey.OP_ACCEPT);
	} catch (ClosedChannelException e) {
	    System.out.println("服务器启动失败,原因:服务器通道已关闭");
	    return false;
	}

	Thread service = new Thread(new ServerService(clientManager));// 提供管理员指令服务线程
	service.setDaemon(true);// 设置为后台线程
	service.start();

	System.out.println("服务器启动成功");
	return true;
    }

    // 等待事件
    private static void select() {
	try {
	    // 等待事件
	    clientManager.select();
	} catch (IOException e) {
	    // 忽略未知的异常
	}
    }

    // 此方法获取请求的socket通道并添加到客户端列表中,当然还要注册到管理器中
    private static void accept(SelectionKey key) {
	SocketChannel socket = null;

	try {
	    // 接受请求的连接
	    socket = ((ServerSocketChannel) key.channel()).accept();
	} catch (IOException e) {// 连接失败
	}

	if (socket == null)
	    return;

	SocketAddress address = null;

	try {
	    address = socket.getRemoteAddress();
	    // 注册
	    socket.configureBlocking(false);
	    socket.register(clientManager, SelectionKey.OP_READ);
	} catch (ClosedChannelException e) {// 注册失败
	    try {
		if (socket != null)
		    socket.close();
	    } catch (IOException e1) {
	    }
	    return;
	} catch (IOException e) {
	    try {
		if (socket != null)
		    socket.close();
	    } catch (IOException e1) {
	    }
	    return;
	}
	// 添加到客户端列表中
	clientList.add(socket);
	System.out.println("主机" + address + "连接到服务器");
    }

    // 此方法接收数据并发送个客户端列表的每一个人
    private static void broadcast(SelectionKey key) {
	SocketChannel sender = (SocketChannel) key.channel();
	// 方法结束不清理
	buff.clear();

	int status = -1;
	try {
	    // 读取数据
	    status = sender.read(buff);
	} catch (IOException e) {// 未知的io异常
	    status = -1;
	}

	if (status <= 0) {// 异常断开连接,并移除此客户端
	    remove(sender);
	    return;
	}

	// 发送给每一个人
	for (SocketChannel client : clientList) {
	    // 除了他或她自己
	    if (client == sender)
		continue;
	    buff.flip();
	    try {
		client.write(buff);
	    } catch (IOException e) {// 发送失败,移除此客户端
		remove(client);
	    }
	}
    }

    private static void remove(SocketChannel client) {
	SocketAddress address = null;// 存储主机地址信息

	clientList.remove(client);// 从列表中移除
	
	try {
	    address = client.getRemoteAddress();//获取客户端地址信息
	} catch (IOException e1) {
	}
	
	try {
	    client.close();// 关闭连接
	} catch (IOException e1) {
	}
	client.keyFor(clientManager).cancel();// 反注册
	System.out.println("与主机" + address + "断开连接");
    }

    // 关闭列表中全部通道
    private static void closeAll() {
	for (SocketChannel client : clientList) {
	    try {
		if (client != null)
		    client.close();
	    } catch (IOException e) {
	    }
	}
    }
}
客户端也是两个线程,一个循环等待接收消息,另一个处理用户输入以及发送:

package kindz.onlinechat;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client implements Runnable {

    private static String ip = null;
    private static String name = null;
    private static String serverHost="127.0.0.1";//服务器地址
    private static int port=3333;//服务器端口号
    private static SocketChannel socket = null;// 与服务器连接通道

    public static void main(String[] args) {
	if(args.length>0){
	    
	    if (args[0].indexOf(':') == -1) {
		System.err.println("目标地址格式不正确");
		return;
	    }

	    serverHost = args[0].split(":")[0];
	    try {
		port = Integer.parseInt(args[0].split(":")[1]);
	    } catch (NumberFormatException e) {
		System.err.println("端口号只能为数字");
		return;
	    }
	}
	
	try {
	    // 初始化失败退出程序
	    if (!init())
		return;

	    ByteBuffer buff = ByteBuffer.allocate(1500);// 字节缓冲器

	    while (socket.read(buff) != -1) {// 读取信息
		String msg = new String(buff.array(), 0, buff.position());// 转成字符串
		buff.clear();// 清理
		System.out.println(msg);
	    }

	} catch (IOException e) {
	    System.out.println("与服务器断开连接");
	} finally {
	    close();
	    System.out.println("程序已退出");
	}
    }

    // 输入线程
    @SuppressWarnings("resource")
    public void run() {
	try {
	    Scanner sc = new Scanner(System.in);
	    // 循环等待输入
	    while (sc.hasNextLine()) {
		String msg = sc.nextLine();

		if (".exit".equals(msg)){
		    socket.write(ByteBuffer.wrap((name+"-"+ip+"下线了").getBytes()));// 发送下线信息
		    break;
		}
		   

		msg = name + "-" + ip + ":" + msg;
		socket.write(ByteBuffer.wrap(msg.getBytes()));
	    }
	} catch (IOException e) {
	    System.out.println("与服务器断开连接");
	} finally {
	    close();
	}
    }

    // 初始化程序
    private static boolean init() {
	System.out.println("正在连接至服务器...");
	try {
	    socket = SocketChannel
		    .open(new InetSocketAddress(serverHost, port));// 打开通道
	} catch (IOException e) {
	    System.out.println("无法连接到服务器");
	    return false;
	}
	System.out.println("已连接至服务器");

	try {
	    InetAddress address = InetAddress.getLocalHost();// 获取本机网络信息
	    ip = address.getHostAddress();// 本机ip
	    name = address.getHostName();// 主机名
	    socket.write(ByteBuffer.wrap((name+"-"+ip+"上线了").getBytes()));// 发送上线信息
	} catch (IOException e) {
	    System.out.println("网络异常");
	    return false;
	}

	Thread thread = new Thread(new Client());
	thread.setDaemon(true);// 设置后台线程
	thread.start();
	return true;
    }

    // 关闭通道
    private static void close() {
	try {
	    if (socket != null)
		socket.close();
	} catch (IOException e) {
	}
    }
}
写完感觉还是挺简单的,但是本人一直从事java web开发,异常处理做的比较少,不知道我这个处理的怎么样。

由于没有那么多好基友帮忙测试,我还写了虚拟客户端来模拟好基友:

package kindz.onlinechat;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class VClient implements Runnable {

    private String ip = null;
    private String name = null;
    private String serverHost = "127.0.0.1";// 服务器ip地址
    private int port = 3333;// 服务器端口号
    private SocketChannel socket = null;// 与服务器连接通道
    private static String[] msgs = {
	    "大家好",
	    "好困啊",
	    "今天该干什么啊",
	    "我这任务好多,先不聊了",
	    "jQuery是继prototype之后又一个优秀的Javascript库。它是轻量级的js库 ,它兼容CSS3,还兼容各种浏览器(IE 6.0+, FF 1.5+, Safari 2.0+, Opera 9.0+),jQuery2.0及后续版本将不再支持IE6/7/8浏览器。jQuery使用户能更方便地处理HTML(标准通用标记语言下的一个应用)、events、实现动画效果,并且方便地为网站提供AJAX交互。jQuery还有一个比较大的优势是,它的文档说明很全,而且各种应用也说得很详细,同时还有许多成熟的插件可供选择。jQuery能够使用户的html页面保持代码和html内容分离,也就是说,不用再在html里面插入一堆js来调用命令了,只需要定义id即可[8]。",
	    "谁那有咖啡", "谁有时间帮我去取个快递", ".exit" };
    private Random random=new Random();

    public VClient(String serverHost, int port) {
	this.serverHost = serverHost;
	this.port = port;
    }

    public void run() {
	try {
	    // 初始化失败退出程序
	    if (!init())
		return;

	    ByteBuffer buff = ByteBuffer.allocate(1500);// 字节缓冲器

	    while (!Thread.interrupted()&&socket.read(buff) != -1) {// 读取信息,中断退出
		String msg = new String(buff.array(), 0, buff.position());// 转成字符串
		buff.clear();// 清理
		System.out.println(msg);
	    }

	} catch (IOException e) {
	    System.out.println("与服务器断开连接");
	} finally {
	    close();
	    System.out.println("程序已退出");
	}
    }

    // 初始化程序
    private boolean init() {
	System.out.println("正在连接至服务器...");
	try {
	    socket = SocketChannel
		    .open(new InetSocketAddress(serverHost, port));// 打开通道
	} catch (IOException e) {
	    System.out.println("无法连接到服务器");
	    return false;
	}
	System.out.println("已连接至服务器");

	try {
	    InetAddress address = InetAddress.getLocalHost();// 获取本机网络信息
	    ip = address.getHostAddress();// 本机ip
	    name = address.getHostName();// 主机名
	    socket.write(ByteBuffer.wrap((name + "-" + ip + "上线了").getBytes()));// 发送上线信息
	} catch (IOException e) {
	    System.out.println("网络异常");
	    return false;
	}

	Thread thread = new Thread(new Daemon());// 私有内部类
	thread.setDaemon(true);// 设置后台线程
	thread.start();
	return true;
    }

    // 关闭通道
    private void close() {
	try {
	    if (socket != null)
		socket.close();
	} catch (IOException e) {
	}
    }

    // 私有内部类、守护线程、输出用
    private class Daemon implements Runnable {

	public void run() {
	    try {
		// 自动循环发送消息
		while (true) {
		    String msg = msgs[random.nextInt(msgs.length)];// 随便拿一个写好的信息

		    if (".exit".equals(msg)) {
			socket.write(ByteBuffer.wrap((name + "-" + ip + "下线了")
				.getBytes()));// 发送下线信息
			break;
		    }

		    msg = name + "-" + ip + ":" + msg;
		    socket.write(ByteBuffer.wrap(msg.getBytes()));

		    TimeUnit.SECONDS.sleep(random.nextInt(9) + 2);//模拟用户输入过程,2-10秒,平均6秒发一次
		}
	    } catch (IOException e) {
		System.out.println("与服务器断开连接");
	    } catch (InterruptedException e) {
		System.out.println("与服务器断开连接");
	    } finally {
		close();
	    }
	}
    }

}
虚拟客户端只是个线程,我给它配了个管理器:

package kindz.onlinechat;

import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class VManager implements Runnable {

    private static String serverHost = "127.0.0.1";
    private static int port = 3333;
    private static int num = 3;// 初始化虚拟客户端数目
    private static int min = 5;// 最小虚拟客户端数目
    private static int max = 15;// 最大虚拟客户端数目
    private static Random random=new Random();

    @SuppressWarnings("resource")
    public static void main(String[] args) {

	if (args.length > 0) {

	    if (args[0].indexOf(':') == -1) {
		System.err.println("目标地址格式不正确");
		return;
	    }

	    serverHost = args[0].split(":")[0];
	    try {
		port = Integer.parseInt(args[0].split(":")[1]);
	    } catch (NumberFormatException e) {
		System.err.println("端口号只能为数字");
		return;
	    }
	}

	if (args.length > 1) {
	    try {
		num = Integer.parseInt(args[1]);
	    } catch (NumberFormatException e) {
		System.err.println("初始化数目只能为数字");
		return;
	    }
	}

	if (args.length > 2) {
	    try {
		min = Integer.parseInt(args[2]);
	    } catch (NumberFormatException e) {
		System.err.println("最小数目只能为数字");
		return;
	    }
	}

	if (args.length > 3) {
	    try {
		max = Integer.parseInt(args[3]);
	    } catch (NumberFormatException e) {
		System.err.println("最大数目只能为数字");
		return;
	    }
	}

	if (max < num) {
	    System.err.println("初始化数量不能大于最大数量");
	    return;
	}
	
	if (max < min) {
	    System.err.println("最小数量不能大于最大数量");
	    return;
	}
	
	Thread manager = new Thread(new VManager());
	manager.start();

	Scanner sc = new Scanner(System.in);

	String arg = null;// 指令
	while (sc.hasNextLine()) {
	    arg = sc.nextLine();// 输入指令

	    if ("shutdown".equals(arg)) {
		manager.interrupt();
		break;
	    } else {
		System.out.println("未知的指令");
	    }
	}
    }

    public void run() {
	ExecutorService manager = Executors.newFixedThreadPool(max);// 线程池管理器

	// 初始化几个客户端
	for (int i = 0; i < num; i++) {
	    manager.execute(new VClient(serverHost, port));
	}

	try {
	    int interval=(int)(2*48000.0/min)+1;//用户上线间隔时间范围
	    while (!Thread.interrupted()) {
		TimeUnit.MILLISECONDS.sleep(random.nextInt(interval));// 用户大概48秒会下线,尽量保持最少用户个数
		manager.execute(new VClient(serverHost, port));
	    }
	} catch (InterruptedException e) {
	} finally {
	    System.out.println("正在停止所有客户端...");
	    manager.shutdownNow();// 中断所有客户端
	}
    }
}
虚拟客户端大概平均6秒发一次消息,每次下线的几率是1/8,所以虚拟客户端大概在上线48秒左右的时候会下线,因此想要保证(只能是尽量保证)最小虚拟客户端数量,只要保证48秒内上线固定数量的用户即可,不过在这里,虚拟客户端增加的频率也是随机的,感觉更真实些。

本人想转Java底层的工作,因此在努力学习中,望大神们能够为小弟点出不对的地方。

想学习Java NIO的童鞋也可以借鉴一下我的代码。

愿与CSDN上的Coder们一起在技术的道路上飞奔。奋斗

以上是关于Java 基础 -- NIO 多人聊天室的主要内容,如果未能解决你的问题,请参考以下文章

基于Java NIO的多人在线聊天工具源码实现(登录,单聊,群聊)

NIO 多人聊天室

Day389.使用JavaNIO实现简易在线多人聊天室 -NIO

Day389.使用JavaNIO实现简易在线多人聊天室 -NIO

nio 代码实现简易多人聊天

Java网络编程基础— 基于TCP的NIO简单聊天系统