分布式理论,架构设计 Netty高级应用

Posted 拐柒

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式理论,架构设计 Netty高级应用相关的知识,希望对你有一定的参考价值。

Netty高级应用

HTTP服务器开发

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

服务器开发

目标:
1.Netty 服务器在 8080 端口监听
2.浏览器发出请求 "http://localhost:8080/ "
3.服务器可以回复消息给客户端 "我是Netty服务器 " ,并对特定请求资源进行过滤.

代码实现

NettyHttpServer

public void run() throws InterruptedException {
        EventLoopGroup boosGroup = null;
        EventLoopGroup workGroup = null;
        try {
            //        1.创建bossGroup线程组: 处理网络事件--连接事件
            boosGroup = new NioEventLoopGroup(1);
//        2.创建workerGroup线程组: 处理网络事件--读写事件  2*处理器线程数
            workGroup = new NioEventLoopGroup();
//        3.创建服务端启动助手
            ServerBootstrap serverBootstrap= new ServerBootstrap();
            //        4.设置bossGroup线程组和workerGroup线程组
            serverBootstrap.group(boosGroup,workGroup)
                    .channel(NioserverSocketChannel.class)//        5.设置服务端通道实现为NIO
                    .option(ChannelOption.SO_BACKLOG,128)//        6.参数设置
                    .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)//        6.参数设置
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {



                            ChannelPipeline pipeline = channel.pipeline();
                            //添加解码器
    //                pipeline.addLast("messageDecoder",new MessageDecode());
    //                pipeline.addLast("messageEncoder",new MessageEnCode());
                            //添加编解码器
//                            pipeline.addLast(new MessageCodec());
                            pipeline.addLast(new HttpServerCodec());
                            //TODO
                            pipeline.addLast(new HttpServerHandler());
                        }
                    });//        7.创建一个通道初始化对象
//        9.启动服务端并绑定端口,同时将异步改为同步
            ChannelFuture future = serverBootstrap.bind(port);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(channelFuture.isSuccess()){
                        System.out.println("端口绑定成功");
                    }else {
                        System.out.println("端口绑定失败");
                    }
                }
            });
            System.out.println("http服务端启动完成");
//        10.关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
            future.channel().closeFuture().sync();
        }  finally {
            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }

HttpServerHandler

public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * 读取就绪事件
     * @param channelHandlerContext
     * @param httpObject
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        //判断请求是否为HTTP请求
         if(httpObject instanceof HttpRequest){
             DefaultHttpRequest request= (DefaultHttpRequest) httpObject;
             System.out.println("浏览器请求地址"+request.uri());
             if("/favicon.ico".equals(request.uri())){
                 System.out.println("图标不相应");
                 return;
             }
             ByteBuf byteBuf= Unpooled.copiedBuffer("我是Netty服务器".getBytes(CharsetUtil.UTF_8));
            //给浏览器相应
             DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf);
             //设置响应头
             response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
             response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
             channelHandlerContext.writeAndFlush(response);
         }
    }
}

网页版聊天室

基于websocket进行网页版聊天室开发

websocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间 的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

WebSocket和HTTP的区别

http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服 务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端.
WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送 信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head 的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了 部分多余的信息

代码实现springboot+netty

NettyWebSocketServer

@Component
public class NettyWebSocketServer implements Runnable{
    @Autowired
    NettyConfig nettyConfig;
    private EventLoopGroup bossGroup=new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup=new NioEventLoopGroup();
    @Autowired
    WebSocketChannelInit webSocketChannelInit;

    /**
     * 资源关闭,在容器销毁时关闭
     */
    @PreDestroy
    public void close(){
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
    @Override
    public void run() {
        try {
        //创建服务端启动助手
        ServerBootstrap bootstrap=new ServerBootstrap();
        //设置线程组
        bootstrap.group(bossGroup,workerGroup);
        // 设置参数
        bootstrap.channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(webSocketChannelInit);
        //启动
        ChannelFuture channelFuture = bootstrap.bind(nettyConfig.getPort()).sync();
        channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

WebSocketChannelInit

@Component
public class WebSocketChannelInit  extends ChannelInitializer {
    @Autowired
    WebSocketHandler webSocketHandler;
    @Autowired
    NettyConfig nettyConfig;
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持
        pipeline.addLast(new HttpServerCodec());
        //对大数据请求流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //HttpObjectAggregator将多个信息转化为多个request或者response
        //post请求分为三部分,  request line、request header 、message body
        pipeline.addLast(new HttpObjectAggregator(8000));
        //将http协议升级为ws协议,对websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
        //自定义处理handler
        pipeline.addLast(webSocketHandler);
    }
}

WebSocketHandler

@Component
@ChannelHandler.Sharable //设置通道共享
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    public static List<Channel> list=new ArrayList<>();
    /**
     * 读就绪事件
     * @param channelHandlerContext
     * @param textWebSocketFrame
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        String text = textWebSocketFrame.text();
        System.out.println("消息:"+text);
        Channel channel = channelHandlerContext.channel();
        for (Channel channel1 : list) {
            if(channel!=channel1){
                channel1.writeAndFlush(new TextWebSocketFrame(text));
            }
        }
    }
    /**
     * 客户端连接时
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        list.add(channel);
        System.out.println("[server]:"+channel.remoteAddress().toString().substring(1)+"上线了");
    }
    /**
     * 通道未就绪,下线了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //客户端断开连接,就移除对应通道
        list.remove(channel);
    }
    /**
     * 异常处理时间
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        list.remove(channel);
    }
}

NettySpringbootApplication

@SpringBootApplication
public class NettySpringbootApplication implements CommandLineRunner {

    @Autowired
    NettyWebSocketServer webSocketServer;

    public static void main(String[] args) {
        SpringApplication.run(NettySpringbootApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(webSocketServer).start();
    }
}

NettyConfig

@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyConfig {

    private int port;//netty监听的端口

    private String path;//websocket访问路径
}

websocket

 var ws = new WebSocket("ws://localhost:8081/chat");
    ws.onopen = function () {
        console.log("连接成功.")
    }
    ws.onmessage = function (evt) {
        showMessage(evt.data);
    }
    ws.onclose = function (){
        console.log("连接关闭")
    }

    ws.onerror = function (){
        console.log("连接异常")
    }

netty中的粘包和拆包

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包 问题。
TCP粘包和拆包产生的原因:
数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区 上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于 缓冲区,进行拆分处理。

粘包和拆包的解决办法

1.业内解决方案
由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。
· 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
· 将换行符作为消息结束符
· 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符通过在消息头中定义长度字段来标识消息的总长度

2.Netty中的粘包和拆包解决方案
Netty提供了4种解码器来解决,分别如下:
· 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
· 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
· 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
· 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要 求,就是应用层协议中包含数据包的长度

以上是关于分布式理论,架构设计 Netty高级应用的主要内容,如果未能解决你的问题,请参考以下文章

分布式理论,架构设计 Netty高级应用

分布式理论,架构设计 Netty

分布式理论,架构设计 Netty

分布式理论,架构设计自定义RPC

分布式理论,架构设计自定义RPC

分布式理论,架构设计自定义RPC