三.Netty入门到超神系列-聊天室案例

Posted 墨家巨子@俏如来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三.Netty入门到超神系列-聊天室案例相关的知识,希望对你有一定的参考价值。

前言

前面我们对NIO的三大核心做了学习,这章我们来基于NIO来做一个聊天室案例。

聊天室案例

先来看下我们要实现的效果

对于服务端而言需要做如下事情

  • selector监听客户端的链接
  • 如果有“读”事件,就从通道读取数据
  • 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发

对于客户端而言需要做如下事情

  • selector监听服务端的“读”事件
  • 如果有数据从通道中读取数据,打印到控制台
  • 监听键盘输入,向服务端发送消息

服务端代码

public class GroupChatServer {

    //选择器
    private Selector selector ;

    //服务端通道
    ServerSocketChannel serverSocketChannel ;

    //初始化服务端
    public GroupChatServer(){
        try {
            //创建选择器
            selector = Selector.open();
            //创建通道
            serverSocketChannel = ServerSocketChannel.open();
            //绑定监听端口
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",5000));
            //配置为异步
            serverSocketChannel.configureBlocking(false);
            //注册通道到选择器
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //处理客户端监听
    public void listen(){
        //轮询监听
        while(true){

            //监听
            try {
                if(selector.select() > 0){
                    //监听到事件,获取到有事件的key
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();

                    Iterator<SelectionKey> iterator = selectionKeys.iterator();

                    while(iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();

                        //如果是“接收就绪”
                        if(selectionKey.isAcceptable()){
                            //注册通道
                            SocketChannel channel = serverSocketChannel.accept();
                            //设置为异步
                            channel.configureBlocking(false);
                            //注册通道,监听类型设置为:“读” ,并指定装数据的Buffer
                            channel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                            System.out.println("客户端:"+channel.getRemoteAddress()+" 上线啦...");
                        }

                        //如果是“读就绪”
                        if(selectionKey.isReadable()){
                            //读取数据 , 转发数据给其他客户
                            readAndForwardMessage(selectionKey);
                        }

                        //删除Key防止重复处理
                        iterator.remove();
                    }

                }else{
                    System.out.println("等待客户端加入...");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //读取数据
    public void readAndForwardMessage(SelectionKey selectionKey){
        SocketChannel channel = null;

        try {
            //得到通道
            channel = (SocketChannel) selectionKey.channel();
            //异步
            channel.configureBlocking(false);
            //准备缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            //把数据读取到缓冲区
            channel.read(byteBuffer);
            //打印结果
            String message = channel.getRemoteAddress()+" : "+new java.lang.String(byteBuffer.array());

            System.out.println(message);

            //转发消息
            forwardToOthers(channel , message);
        } catch (IOException e) {
            e.printStackTrace();
            try {
                //出现异常,关闭通道
                selectionKey.channel();
                if(channel != null){
                    channel.close();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    //转发数据
    public void forwardToOthers(SocketChannel self,String message){
        //转发给所有通道
        Set<SelectionKey> keys = selector.keys();

        Iterator<SelectionKey> iterator = keys.iterator();

        while(iterator.hasNext()){
            //拿到某一个客户端的key
            SelectionKey selectionKey = iterator.next();

            SelectableChannel selectableChannel = selectionKey.channel();

            //排除当前发消息的客户端不用转发
            if(selectableChannel instanceof SocketChannel && selectableChannel != self){
                //转发
                SocketChannel channel = (SocketChannel) selectableChannel;

                try {
                    channel.write(ByteBuffer.wrap(message.getBytes()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String[] args) {
        new GroupChatServer().listen();
    }

}

客户端代码

public class GroupChatClient {

    //选择且
    private Selector selector ;

    //服务端通道
    SocketChannel socketChannel ;

    //初始化服务端
    public GroupChatClient(){
        try {
            //创建选择器
            selector = Selector.open();
            //创建通道
            socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",5000));

            //配置为异步
            socketChannel.configureBlocking(false);

            //注册通道到选择器,客户端监听读事件
            socketChannel.register(selector, SelectionKey.OP_READ);

            System.out.println(socketChannel.getLocalAddress()+":准备完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //发送消息
    public void sendMessage(String message){
        try {
            socketChannel.write(ByteBuffer.wrap(message.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //处理客户端监听
    public void readMessage(){
        while (true){
            try {
                if(selector.select() > 0){
                    //监听到有事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while(iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        //如果是可读事件
                        if(key.isReadable()){
                            //获取通道
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.configureBlocking(false);
                            //读取通道数据
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            channel.read(byteBuffer);
                            //打印内容
                            System.out.println(new String(byteBuffer.array()));
                        }
                        //防止重复操作
                        iterator.remove();
                    }
                }else{
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                //e.printStackTrace();
                System.out.println("服务器关闭...");
            }
        }
    }

    public static void main(String[] args) {
        GroupChatClient groupChatClient = new GroupChatClient();
        
		//新开线程,专门处理服务端发送过来的消息,不然会阻塞主线程
        new Thread(()->{
            //读取消息
            groupChatClient.readMessage();
        }).start();

        //写消息 , 键盘输入
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNextLine()){
            String message = scanner.nextLine();
            //发消息
            groupChatClient.sendMessage(message);
        }
    }
}

文章结束希望对你有所帮助

以上是关于三.Netty入门到超神系列-聊天室案例的主要内容,如果未能解决你的问题,请参考以下文章

十.Netty入门到超神系列-基于WebSocket开发聊天室

三.Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)

八.Netty入门到超神系列-Netty入门程序

二.Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)

一.Netty入门到超神系列-BIONIOAIO的认识

六.Netty入门到超神系列-Java NIO零拷贝实战