netty入门demo

Posted 好大的月亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty入门demo相关的知识,希望对你有一定的参考价值。

jdk原生的socket实现nio太麻烦,而且有很多问题,netty经过了大厂认证,就选它了.

netty server

package com.chan.netty.service;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

/**
 * ClassName: NettyServerService
 * Description:
 * date: 2022/7/20 11:29
 * 要启动一个Netty服务端,必须要指定三类属性,
 * 分别是线程模型、IO 模型、连接读写处理逻辑,
 * 有了这三者,之后在调用bind(8000),我们就可以在本地绑定一个 8000 端口启动起来
 * @author fchen
 */
public class NettyServerService 


    public static void main(String[] args) 

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        //这两个对象可以看做是传统IO编程模型的两大线程组,boss表示监听端口,accept新连接的线程组,worker表示处理每一条连接的数据读写的线程组。
        //引导类 ServerBootstrap,这个类将引导我们进行服务端的启动工作。
        //通过.group(bossGroup, workerGroup)给引导类配置两大线程组,这个引导类的线程模型也就定型了。
        //通过.channel(NioServerSocketChannel.class)来指定 IO 模型为NIO,当然,这里也有其他的选择,如果你想指定 IO 模型为 BIO,那么这里配置上OioServerSocketChannel.class类型即可,当然通常我们也不会这么做,因为Netty的优势就在于NIO。
        //调用childHandler()方法,给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。

        serverBootstrap
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() 

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() 
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception 
                                System.out.println(msg);
                            
                        );
                    
                )
                .bind(8000);
    

netty client

package com.chan.netty.service;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Date;

/**
 * ClassName: NettyClientService
 * Description:
 * date: 2022/7/20 11:33
 *
 * @author fchen
 */
public class NettyClientService 


    public static void main(String[] args) throws InterruptedException 

        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        //Bootstrap,负责启动客户端以及连接服务端;而我们在描述服务端的启动的时候,这个引导类是 ServerBootstrap;
        //通过bootstrap.group(group)指定线程模型;
        //我们指定 IO 模型为 NioSocketChannel,表示 IO 模型为 NIO。也可以设置 IO 模型为 OioSocketChannel,但是通常不会这么做,因为 Netty 的优势在于 NIO;
        //给引导类指定一个 handler,这里主要就是定义连接的业务处理逻辑;

        bootstrap
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() 

                    @Override
                    protected void initChannel(Channel channel) throws Exception 
                        channel.pipeline().addLast(new StringEncoder());
                    
                );

        //connect 方法有两个参数,第一个参数可以填写 IP 或者域名,第二个参数填写的是端口号
        //由于 connect 方法返回的是一个 Future,也就是说这个方是异步的,我们通过 addListener 方法可以监听到连接是否成功,进而打印出连接信息

        Channel channel = bootstrap.connect("127.0.0.1", 8000).addListener(future -> 
            if (future.isSuccess()) 
                System.out.println("连接成功");
             else 
                System.out.println("连接失败");

                //重新连接
                bootstrap.connect("127.0.0.1", 8000);
            

        ).channel();


        while (true) 
            channel.writeAndFlush(new Date() + ":hello world");
            Thread.sleep(2000);
        

    

客户端和服务端分离

服务端

package com.chan.netty.impl;

import com.chan.netty.impl.handler.ServerChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * ClassName: NettyServer
 * Description:
 * date: 2022/7/20 16:24
 *
 * @author fchen
 */
@Slf4j
public class NettyServer 

    public void startListener(String url, Integer port)

        InetSocketAddress socketAddress = new InetSocketAddress(url, port);

        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                //自定义初始化策略
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                //设置队列大小,未设置或所设置的值小于1,将使用默认值 50
                //当服务器请求处理线程全满时,用于临时存放已完成 三次握手 的请求的队列的最大长度
                .option(ChannelOption.SO_BACKLOG, 1024)
                //两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        try 
            //绑定端口,开始接收进来的连接
            ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync().addListener(future -> 
                if(future.isSuccess())
                    log.info("监听端口:成功", port);
                else 
                    log.error("监听端口:失败", port);
                
            );

            channelFuture.channel().closeFuture().sync().addListener(future -> 
                if(future.isSuccess())
                    log.info("关闭成功");
                else 
                    log.error("关闭失败");
                
            );

         catch (InterruptedException e) 
            log.error("监听地址:,端口:,失败;异常:",url, port, e.getMessage());
         finally 
            //关闭主线程组
            boss.shutdownGracefully();
            //关闭工作线程组
            worker.shutdownGracefully();
        
    

服务端自定义初始化策略

package com.chan.netty.impl.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * ClassName: ServerChannelInitializer
 * Description:
 * date: 2022/7/20 16:27
 *
 * @author fchen
 */
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> 

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
    //判断看是否有"\\n"或者"\\r\\n",如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行,它是以换行符为结束标志的解码器
        //LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包问题
        socketChannel.pipeline().addLast(new LineBasedFrameDecoder(102400, true, true));

        //maxLength:超过1024字节长度时仍没有发现\\n或者\\r\\n时将会抛出TooLongFrameException
        //stripDelimiter:解码后的消息是否去除分隔符。
        //failFast:与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机。如果为true,则超出maxLength后立即抛出TooLongFrameException,不继续进行解码;如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常。
        //delimiter:分隔符
        //socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1 << 10, true, true, Unpooled.copiedBuffer("XXX".getBytes())));

        //StringDecoder的功能非常简单,就是将接收到的对象转换成字符串
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

        //自定义class,处理监听到的数据
        socketChannel.pipeline().addLast(new NettyServerHandler());
    

服务端自定义class处理收到的数据

package com.chan.netty.impl.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;

/**
 * ClassName: NettyServerHandler
 * Description:
 * date: 2022/7/20 16:29
 *
 * @author fchen
 */
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter 


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        log.info("建立连接,channel active channelId:", ctx.channel().id());
    

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        log.info("服务端收到消息:", msg);
    

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 
        String response = "服务端响应,时间:" + LocalDateTime.now() + ",channelId:" + ctx.channel().id();
        log.info(response);
        ctx.writeAndFlush(response);
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        log.error("服务端发生异常:", cause.getMessage());
        ctx.close();
    


客户端

客户端连接和发送消息

package com.fchan.netty.impl;

import com.fchan.netty.impl.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * ClassName: NettyClient
 * Description:
 * date: 2022/7/20 16:51
 *
 * @author fchen
 */
@Component
@Slf4j
public class NettyClient 


    private String url = "127.0.0.1";

    private Integer port = 8000;

    private Channel channel;
    private EventLoopGroup group;

    public NettyClient() 
    

    public NettyClient(String url, Integer port) 
        this.url = url;
        this.port = port;
    

    public String getUrl() 
        return url;
    

    public Integer getPort() 
        return port;
    


    public void sendMsg(String msg)

        if(!this.channel.isActive() || !this.channel.isOpen())
            this.connect();
        

        this.channel.writeAndFlush(msg).addListener(future -> 
            if(future.isSuccess())
                log.info("发送消息成功:", msg);
             else 
                log.error("发送消息失败:", msg);
            
        );

    


    private void close()
        try 
            channel.closeFuture().sync().addListener(future -> 
                if(future.isSuccess())
                    log.info("关闭成功");
                else 
                    log.info("关闭失败");
                
            );

         catch (InterruptedException e) 
            log.error("close() called with parameters => , exception = 【】", e.getMessage());
        finally 
            //以一种优雅的方式进行线程退出
          

以上是关于netty入门demo的主要内容,如果未能解决你的问题,请参考以下文章

第一个netty程序

即时通讯开发之Netty入门长文

原创NIO框架入门:Android与MINA2Netty4的跨平台UDP双向通信实战

Spring之IOC/DI(反转控制/依赖注入)_入门Demo

Netty实现心跳机制demo

高性能NIO框架Netty-对象传输