4.基于NIO的群聊系统

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4.基于NIO的群聊系统相关的知识,希望对你有一定的参考价值。

【README】

1.本文总结自B站《netty-尚硅谷》,很不错;

2.文末有错误及解决方法;


【1】群聊需求

1)编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非
阻塞)
2)实现多人群聊;
3)服务器端:可以监测用户上线,离线,并实现消息转发功能;
4)客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受
其它用户发送的消息(有服务器转发得到);


【2】概要设计

1)服务器端:

  •   服务器启动并监听 6667 ;
  •   服务器接收客户端消息,并实现转发,处理上线 与  离线;

2)客户端

  •   连接服务器;
  •   发送消息;
  •   接收服务器消息 ;

【3】代码实现及自测

【3.1】服务器端

/**
 * @Description 群聊服务器端 
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年08月19日
 */
public class NIOGchatServer 

    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    /**
     * @description 构造器
     * @author xiao tang
     * @date 2022/8/19
     */
    public NIOGchatServer() 
        try 
            // 得到选择器
            selector = Selector.open();
            // 初始化 ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            // 绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            // 设置非阻塞模式
            listenChannel.configureBlocking(false);
            // 把listenChannel注册到selector,事件为 ACCEPT
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
         catch (Exception e) 
            e.printStackTrace();
            System.out.println("群聊服务器构造异常");
        
    

    public static void main(String[] args) throws IOException 
        // 创建服务器对象,并监听端口
        new NIOGchatServer().listen();
    

    /**
     * @description 监听
     * @param
     * @author xiao tang
     * @date 2022/8/19
     */
    public void listen() throws IOException 
        while(true) 
            // 等待客户端请求连接
            selector.select();
            // 获取选择key集合
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) 
                SelectionKey key = iterator.next();
                if (key.isAcceptable())  // 通道发生连接事件
                    SocketChannel sc = listenChannel.accept();
                    sc.configureBlocking(false); // 设置为非阻塞
                    // 将 sc 注册到 selector 上
                    sc.register(selector, SelectionKey.OP_READ);
                    // 提示
                    System.out.println(sc.getRemoteAddress() + " connected successfully.");
                
                if (key.isReadable())  // 通道发生 read 事件
                    // 处理读
                    this.readData(key);
                
                // 用完之后,要移除key
                iterator.remove();
            
        
    
    
    /** 
     * @description 读取客户端消息
     * @param key 选择键
     * @return 
     * @author xiao tang 
     * @date 2022/8/19 
     */
    private void readData(SelectionKey key) 
        // 定义一个socketchannel
        SocketChannel channel = null;
        try 
            // 取到关联的channel
            channel = (SocketChannel) key.channel();
            // 创建缓冲 buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            // 根据count的值做处理
            if (count > 0) 
                // 把缓冲区的数据转为字符串并输出
                String msg = new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
                // 输出该消息
                System.out.println(msg);
                // 向其他客户端转发消息
                this.forward2OtherClients(msg, channel);
            
         catch (IOException e) 
            e.printStackTrace();
            try 
                System.out.println(channel.getRemoteAddress() + " has been offline.");
                // 取消注册
                key.channel();
                // 关闭通道
                channel.close();
             catch (IOException e2) 
                e2.printStackTrace();
            
        
    

    /**
     * @description 消息转发给其他客户端
     * @param msg 消息
     * @param self 当前 SocketChannel
     * @author xiao tang
     * @date 2022/8/19
     */
    private void forward2OtherClients(String msg, SocketChannel self) throws IOException 
        // 遍历所有注册到 selector 上的 SocketChannel 并排除自己
        for (SelectionKey key : selector.keys()) 
            // 排除自己
            if (key.equals(self.keyFor(selector))) continue;
            // 通过key 取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            // 消息转发
            if (targetChannel instanceof SocketChannel) 
                SocketChannel dest = (SocketChannel) targetChannel;
                // 把 msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                // 把buffer数据写入通道
                dest.write(buffer);
            
        
    
    

【3.2】客户端

/**
 * @Description NIO群聊客户端
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年08月19日
 */
public class NIOGchatClient 

    // 定义相关的属性
    private static final String HOST = "127.0.0.1"; // 服务器ip地址
    private static final int PORT = 6667; // 服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String userName;
    // 线程池
    private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();

    public static void main(String[] args) 
        try 
            // 启动客户端
            NIOGchatClient client = new NIOGchatClient();
            // 启动一个线程,每隔3秒读取服务器发送的数据
            THREAD_POOL.submit(new Runnable() 
                @Override
                public void run() 
                    while(true) 
                        try 
                            client.read();
                            TimeUnit.SECONDS.sleep(3);
                         catch (Exception e) 
                            e.printStackTrace();
                            break;
                        
                    
                
            );

            // 客户端接收控制台输入,并发送数据给服务器
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNextLine()) 
                client.send(scanner.nextLine());
            
         catch (Exception e) 
            e.printStackTrace();
         finally 
            THREAD_POOL.shutdown();
        
    

    /**
     * @description 构造器
     * @author xiao tang
     * @date 2022/8/19
     */
    public NIOGchatClient() throws IOException 
        this.selector = Selector.open();
        // 连接服务器
        socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
        // 设置非阻塞
        socketChannel.configureBlocking(false);
        // 将 channel 注册到 selector,事件 READ
        socketChannel.register(this.selector, SelectionKey.OP_READ);
        // 得到userName
        userName = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(userName + " connected server successfully.");
    

    /**
     * @description 发送消息到服务器
     * @param msg 消息
     * @author xiao tang
     * @date 2022/8/19
     */
    private void send(String msg) 
        msg = userName + ":" + msg;
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
        try 
            socketChannel.write(buffer);
         catch (IOException e) 
            e.printStackTrace();
        
    

    /**
     * @description 从通道读取数据并显示
     * @author xiao tang
     * @date 2022/8/20
     */
    private void read() throws IOException 
        selector.select();
        // 存在可用通道,读取数据并显示 (注意这里是 selector.selectedKeys() 而不是 selector.keys() )
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while (iterator.hasNext()) 
            SelectionKey key = iterator.next();
            // 若可读通道,则读取
            if (key.isReadable()) 
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = sc.read(buffer);
                System.out.println(new String(buffer.array(), 0, count , StandardCharsets.UTF_8));
            
            // 用完key要移除
            iterator.remove();
        
    

【3.3】测试效果


【4】报错及解决

1)问题1:为什么要移除key ?

// 用完之后,要移除key
iterator.remove();

refer2 Why the key should be removed in `selector.selectedKeys().iterator()` in java nio? - Stack Overflow

There are 2 tables in the selector:

  1. registration table: when we call channel.register, there will be a new item(key) into it. Only if we call key.cancel(), it will be removed from this table.

  2. ready for selection table: when we call selector.select(), the selector will look up the registration table, find the keys which are available, copy the references of them to this selection table. The items of this table won't be cleared by selector(that means, even if we call selector.select() again, it won't clear the existing items)

That's why we have to invoke iter.remove() when we got the key from selection table. If not, we will get the key again and again by selector.selectedKeys() even if it's not ready to use.

大意就是:选择器中有2个表,分别是 表1是注册表; 表2是就绪选择表

调用 selector.select() 时, 注册表1中对应通道有事件的key 会被拷贝到就绪选择表2;而 选择器不会清理表2的key;即便我们重复调用 selector.select() 时,它也不会清理表2的key;

这也就是为什么我们从选择表2中获得key后,会调用 it.remove() 清理掉key;如果不清理,我们重复调用 selector.selectedKeys() 时,还是会获取之前的key,即便这些key对应 通道没有事件,这就会导致报空指针

2)分清楚 selector.selectedKeys() 和 selector.keys() 的 区别

  • selector.selectedKeys():获取有事件发生的通道对应的键集合,如 ACCEPT事件,READ事件;
  • selector.keys():获取注册到当前选择器的所有通道对应的key集合;(因为通道要先实现多路复用,就需要注册到选择器,选择器会产生一个key,与通道关联起来);

3)为什么客户端或服务器在读取缓冲区的内容时,我要通过offset + 长度去获取?如 代码:

// 若可读通道,则读取
if (key.isReadable()) 
    SocketChannel sc = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int count = sc.read(buffer);
    System.out.println(new String(buffer.array(), 0, count , StandardCharsets.UTF_8));

【代码解说】

  • 上述代码的最后一行,offset 等于0, 长度是count;
  • 因为如果不使用 count 限定buffer范围的话,打印出来有很多换行。(当然是我的测试案例里是有换行 ,有兴趣的同学可以自己试下);
  • 加了count,限定范围后,就没有换行了。

以上是关于4.基于NIO的群聊系统的主要内容,如果未能解决你的问题,请参考以下文章

Linux下基于TCP协议的群聊系统设计(多线程+select)

基于NIO实现后台简易群聊系统

基于NIO实现后台简易群聊系统

Swoole实现基于WebSocket的群聊私聊

#yyds干货盘点# 基于Netty,手写一个群聊系统

企业微信如何解散自己建的群聊