Netty 实现聊天功能

Posted Java面试通关手册

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 实现聊天功能相关的知识,希望对你有一定的参考价值。

对Netty不了解可以查看我的上一篇文章:

例子来源:https://waylau.com/ (推荐老卫的网站,感觉挺不错),另外点击阅读原文即可查看原文。

Netty 实现聊天功能

环境要求:

  • JDK 7+


  • Maven 3.2.x


  • Netty 4.x


开始编码:

让我们从 handler (处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。

SimpleChatServerHandler.java

 
   
   
 
  1. /**

  2. *服务端 channel

  3. *SimpleChannelInboundHandler实现了 ChannelInboundHandler 接口,

  4. *ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。

  5. */

  6. public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)

  7.    /**

  8.     * A thread-safe Set Using ChannelGroup, you can categorize Channels into a

  9.     * meaningful group. A closed Channel is automatically removed from the

  10.     * collection,

  11.     */

  12.    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

  13.     /**

  14.      * 每当从服务端收到新的客户端连接时,客户端的 Channel 存入 ChannelGroup 列表中,并通知列表中的其他客户端 Channel

  15.      */

  16.    @Override

  17.    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

  18.        Channel incoming = ctx.channel();

  19.        // Broadcast a message to multiple Channels

  20.        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");

  21.        channels.add(ctx.channel());

  22.    }

  23.    /**

  24.     * 每当从服务端收到客户端断开时,客户端的 Channel 自动从 ChannelGroup 列表中移除了,并通知列表中的其他客户端 Channel

  25.     */

  26.    @Override

  27.    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)

  28.        Channel incoming = ctx.channel();

  29.        //将消息广播到多个Channel

  30.        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");

  31.        // 一个关闭的Channel将自动从ChannelGroup中移除,

  32.        // so there is no need to do "channels.remove(ctx.channel());"

  33.    }

  34.    /**

  35.     * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。

  36.     * 如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()

  37.     */

  38.    @Override

  39.    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)

  40.        Channel incoming = ctx.channel();

  41.        for (Channel channel : channels) {

  42.            if (channel != incoming) {

  43.                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");

  44.            } else {

  45.                channel.writeAndFlush("[you]" + s + "\n");

  46.            }

  47.        }

  48.    }

  49.    /**

  50.     * 服务端监听到客户端活动

  51.     */

  52.    @Override

  53.    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)

  54.        Channel incoming = ctx.channel();

  55.        System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "在线");

  56.    }

  57.    /**

  58.     * 服务端监听到客户端不活动

  59.     */

  60.    @Override

  61.    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)

  62.        Channel incoming = ctx.channel();

  63.        System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "掉线");

  64.    }

  65.    /**

  66.     * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,

  67.     * 即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。

  68.     * 在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。

  69.     * 然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。

  70.     */

  71.    @Override

  72.    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

  73.        Channel incoming = ctx.channel();

  74.        System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "异常");

  75.        // 当出现异常就关闭连接

  76.        cause.printStackTrace();

  77.        ctx.close();

  78.    }

  79. }

SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。

SimpleChatServerInitializer.java

 
   
   
 
  1. /**

  2. * 服务端 ChannelInitializer

  3. * 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。

  4. */

  5. public class SimpleChatServerInitializer extends

  6.        ChannelInitializer<SocketChannel> {

  7.    @Override

  8.    public void initChannel(SocketChannel ch) throws Exception {

  9.         ChannelPipeline pipeline = ch.pipeline();

  10.        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

  11.        pipeline.addLast("decoder", new StringDecoder());

  12.        pipeline.addLast("encoder", new StringEncoder());

  13.        pipeline.addLast("handler", new SimpleChatServerHandler());

  14.        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");

  15.    }

  16. }

编写一个 main() 方法来启动服务端。

SimpleChatServer.java

 
   
   
 
  1. /**

  2. * 简单聊天服务器-服务端

  3. */

  4. public class SimpleChatServer {

  5.    private int port;

  6.    public SimpleChatServer(int port) {

  7.        this.port = port;

  8.    }

  9.    public void run() throws Exception {

  10.        //在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。

  11.        //第一个经常被叫做‘boss’,用来接收进来的连接。

  12.        //第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。

  13.        EventLoopGroup bossGroup = new NioEventLoopGroup();

  14.        EventLoopGroup workerGroup = new NioEventLoopGroup();

  15.        try {

  16.              //启动 NIO 服务的辅助启动类

  17.            ServerBootstrap serverBootstrap = new ServerBootstrap();

  18.            //用于处理ServerChannel和Channel的所有事件和IO。

  19.            serverBootstrap.group(bossGroup, workerGroup)

  20.             .channel(NioserverSocketChannel.class)

  21.             .childHandler(new SimpleChatServerInitializer())  

  22.             .option(ChannelOption.SO_BACKLOG, 128)          

  23.             .childOption(ChannelOption.SO_KEEPALIVE, true);

  24.            System.out.println("SimpleChatServer 启动了");

  25.            // 绑定端口,开始接收进来的连接

  26.            ChannelFuture f = serverBootstrap.bind(port).sync(); // (7)

  27.            // 等待服务器  socket 关闭 。

  28.            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。

  29.            f.channel().closeFuture().sync();

  30.        } finally {

  31.            workerGroup.shutdownGracefully();

  32.            bossGroup.shutdownGracefully();

  33.            System.out.println("SimpleChatServer 关闭了");

  34.        }

  35.    }

  36.    public static void main(String[] args) throws Exception {

  37.        int port;

  38.        if (args.length > 0) {

  39.            port = Integer.parseInt(args[0]);

  40.        } else {

  41.            port = 8080;

  42.        }

  43.        new SimpleChatServer(port).run();

  44.    }

  45. }

客户端的处理类比较简单,只需要将读到的信息打印出来即可

SimpleChatClientHandler.java

 
   
   
 
  1. /**

  2. *客户端 channel

  3. *客户端的处理类比较简单,只需要将读到的信息打印出来即可

  4. */

  5. public class SimpleChatClientHandler extends  SimpleChannelInboundHandler<String> {

  6.    @Override

  7.    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {

  8.        System.out.println(s);

  9.    }

  10. }

与服务端类似

SimpleChatClientInitializer.java

 
   
   
 
  1. /**

  2. * 客户端 ChannelInitializer

  3. */

  4. public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {

  5.    @Override

  6.    public void initChannel(SocketChannel ch) throws Exception {

  7.        ChannelPipeline pipeline = ch.pipeline();

  8.        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));

  9.        pipeline.addLast("decoder", new StringDecoder());

  10.        pipeline.addLast("encoder", new StringEncoder());

  11.        pipeline.addLast("handler", new SimpleChatClientHandler());

  12.    }

  13. }

编写一个 main() 方法来启动客户端

SimpleChatClient.java

 
   
   
 
  1. /**

  2. * 简单聊天服务器-客户端

  3. */

  4. public class SimpleChatClient {

  5.    public static void main(String[] args) throws Exception{

  6.        new SimpleChatClient("localhost", 8080).run();

  7.    }

  8.    private final String host;

  9.    private final int port;

  10.    public SimpleChatClient(String host, int port){

  11.        this.host = host;

  12.        this.port = port;

  13.    }

  14.    public void run() throws Exception{

  15.        EventLoopGroup group = new NioEventLoopGroup();

  16.        try {

  17.            Bootstrap bootstrap  = new Bootstrap()

  18.                    .group(group)

  19.                    .channel(NioSocketChannel.class)

  20.                    .handler(new SimpleChatClientInitializer());

  21.            Channel channel = bootstrap.connect(host, port).sync().channel();

  22.            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

  23.            while(true){

  24.                channel.writeAndFlush(in.readLine() + "\r\n");

  25.            }

  26.        } catch (Exception e) {

  27.            e.printStackTrace();

  28.        } finally {

  29.            group.shutdownGracefully();

  30.        }

  31.    }

  32. }

效果:


以上是关于Netty 实现聊天功能的主要内容,如果未能解决你的问题,请参考以下文章

Netty网络聊天 聊天室实战

Netty轻松实现聊天室,附带数据记录,聊天历史

《Netty+JavaFx实战:仿桌面版微信聊天》代码开源上云部署视频讲解,只为让你给点个Star!

灰常详细的Netty实现聊天室的代码解析

基于Netty的简单多人聊天程序(服务端)

实现分布式服务注册及简易的netty聊天