网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)相关的知识,希望对你有一定的参考价值。

一 netyy实现tcp通讯

1.1 案例需求描述

1.netty服务端可以监听6666端口,客户端向此服务器进行发送信息。例如“hello,netty服务器....”;

2.服务端收到信息对客户端进行回复信息,例如“hello,客户端发的消息已经收到.....”

1.2 代码

1.2.1 客户端

1.服务端代码

package com.ljf.netty.netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;


/**
 * @ClassName: NettyTcpClient
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/05/29 15:52:40
 * @Version: V1.0
 **/
public class NettyTcpClient 
    public static void main(String[] args) 
        //客户端需要一个事件循环组
        EventLoopGroup group=new NioEventLoopGroup();
        try 
        //创建客户端启动对象,注意的一点是 客户端使用的不是ServerBootstrap,而是Bootstrap
        Bootstrap bootstrap=new Bootstrap();
        //设置相关参数
        bootstrap.group(group) //设置线程组
        .channel(NioSocketChannel.class) //设置客户端通道的实现类(反射)
        .handler(new ChannelInitializer<SocketChannel>() 
            protected void initChannel(SocketChannel ch)
                ch.pipeline().addLast(new NettyTcpClientHandler());//加入自己的处理器
            
        );
        //客户端主备就绪
        System.out.println("客户端 is ok.....");
        //启动客户端去链接服务器端,channelfuture,涉及道netty的异步模型
            ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",6666).sync();
         //给关闭通道进行监听
         channelFuture.channel().closeFuture().sync();
         catch (InterruptedException e) 
            e.printStackTrace();
        
        finally 
            group.shutdownGracefully();
        

    

2.client的自定义的handler

package com.ljf.netty.netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.nio.ByteBuffer;

/**
 * @ClassName: NettyTcpClientHandler
 * @Description: TODO 这是自己定义handler,客户端发送的信息
 * @Author: liujianfu
 * @Date: 2022/05/29 16:38:47
 * @Version: V1.0
 **/
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter 
    /**
     * @author liujianfu
     * @description
     * channelHandlerContext ctx 上下文对象中,含有: 管道pipeline  通道channel ,地址
     * Object msg: 就是客户端发送的数据,默认为Object
     * 当通道就绪就会触发此方法
     * @date 2022/5/29 15:34
     * @param [ctx, msg]
     * @return void
     */
    public void channelActive(ChannelHandlerContext ctx)
         System.out.println("触发 channelActive");
         String sendContent=" 你好,服务端器,我是客户端252525!";
        System.out.println("================客户端--->服务端的消息:【 "+ sendContent+" 】");
        ctx.writeAndFlush(Unpooled.copiedBuffer(sendContent,CharsetUtil.UTF_8));
        System.out.println("================客户端--->服务端消息完毕=================================");
    
    //当通道有读取事件时,会触发
    public void channelRead(ChannelHandlerContext ctx,Object msg)
        ByteBuf buf=(ByteBuf)msg;
        System.out.println("收到服务端的回执信息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:"+ ctx.channel().remoteAddress());

    
    //处理异常,一般需要关闭通道
    public  void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
        cause.printStackTrace();
        ctx.close();
    


1.2.2 服务端

1.服务端代码

package com.ljf.netty.netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @ClassName: NettyTcpServer
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/05/28 19:32:58
 * @Version: V1.0
 **/
public class NettyTcpServer 
    /**
    * @author liujianfu
    * @description
     * 1.创建连个线程组bossGroup和workerGroup
     * 2.bossGroup只是处理连接请求,真正和客户端业务处理的,会交给workerGroup完成。
     * EventLoopGroup bossGroup=new NioEventLoopGroup(1);//默认为cpu核数*2;
     * EventLoopGroup workGroup=new NioEventLoopGroup();
     * 3.两个都是无线循环,构造函数可以设置子线程的个数。
     *
     *
    * @date 2022/5/28 19:35
    * @param [args]        
    * @return void
    */
    public static void main(String[] args)  
        //创建连个线程组bossGroup和workerGroup
        EventLoopGroup bossGroup=new NioEventLoopGroup(1);//默认为cpu核数*2;
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try 
        //创建服务器端的启动对象,配置参数
        ServerBootstrap bootstrap=new ServerBootstrap();
        //使用链式编程来进行设置
        bootstrap.group(bossGroup,workGroup) //设连个线程组
                 .channel(NioServerSocketChannel.class) //使用NioSocketChannel作为服务器的通道实现
                 .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到的连接个数
                 .childOption(ChannelOption.SO_KEEPALIVE,true)  //设置保持活动连接状态
                  .childHandler(new ChannelInitializer<SocketChannel>() // 创建一个普通测试对象,内名内部类对象
                      //给pipeline 设置处理
                      protected  void  initChannel(SocketChannel ch)
                          ch.pipeline().addLast(new NettyTcpServerHandler());
                      
                  );//给我们的workerGroup的EventLopp对应的管道设置处理器,自定义的。
        System.out.println(".......netty服务器 is ready.....");
        //绑定一个端口并且同步,生成了一个ChanelFuture对象
        //启动服务器(并绑定端口)
        ChannelFuture channelFuture=bootstrap.bind(6666).sync();
        //对关闭的通道进行监听
            channelFuture.channel().closeFuture().sync();
         catch (InterruptedException e) 
            e.printStackTrace();
        
        finally 
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        
    

2.服务端handler

package com.ljf.netty.netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;


/**
 * @ClassName: NettyTcpServerHandler
 * @Description: TODO   这是自己定义handler,读取客户端发送过来的数据
 * @Author: liujianfu
 * @Date: 2022/05/29 14:15:14
 * @Version: V1.0
 **/
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter 
   /**
   * @author liujianfu
   * @description
    * channelHandlerContext ctx 上下文对象中,含有: 管道pipeline  通道channel ,地址
    * Object msg: 就是客户端发送的数据,默认为Object
   * @date 2022/5/29 15:34
   * @param [ctx, msg]
   * @return void
   */
    public void channelRead(ChannelHandlerContext ctx,Object msg)
        System.out.println("服务器读取线程:"+Thread.currentThread().getName());
        System.out.println(" server ctx:"+ctx);
        //获取通道信息
        Channel channel= (Channel) ctx.channel();
        //获取pipeline,本质上是一个双向链接,出战入战
        ChannelPipeline pipeline=ctx.pipeline();
        //将msg转成一个ByteBuf,ByteBUf是netty提供的,不是Nio提供的
        ByteBuf buf=(ByteBuf)msg;
        String content=buf.toString(CharsetUtil.UTF_8);
        System.out.println("接受道客户端发来的消息:"+content);
        System.out.println("客户端的地址:"+channel.remoteAddress());
    
    //数据读取完毕
    public void channelReadComplete(ChannelHandlerContext ctx)
        //将数据写入道缓存,并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到!",CharsetUtil.UTF_8));

    
    //处理异常,一般需要关闭通道
    public  void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
        ctx.close();
    

1.2.3 调用

1.启动服务端

 2.再启动客户端

1.2.4 将服务端自定义的handler设置定时任务执行

​
package com.ljf.netty.netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

/**
 * @ClassName: NettyTcpServerHandler3
 * @Description: TODO
 * @Author: liujianfu
 * @Date: 2022/05/29 17:44:26
 * @Version: V1.0
 **/
public class NettyTcpServerHandler3 extends ChannelInboundHandlerAdapter 
    /**
     * @author liujianfu
     * @description
     * channelHandlerContext ctx 上下文对象中,含有: 管道pipeline  通道channel ,地址
     * Object msg: 就是客户端发送的数据,默认为Object
     * @date 2022/5/29 15:34
     * @param [ctx, msg]
     * @return void
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg)
        System.out.println("服务器读取线程:"+Thread.currentThread().getName());
        System.out.println(" server ctx:"+ctx);
        //获取通道信息
        Channel channel= (Channel) ctx.channel();
        //获取pipeline,本质上是一个双向链接,出战入战
        ChannelPipeline pipeline=ctx.pipeline();
        //将msg转成一个ByteBuf,ByteBUf是netty提供的,不是Nio提供的
        ByteBuf buf=(ByteBuf)msg;
        String content=buf.toString(CharsetUtil.UTF_8);
        System.out.println("接受道客户端发来的消息:"+content);
        System.out.println("客户端的地址:"+channel.remoteAddress());
        //自定义定时任务,任务放到scheduledtaskqueue中,多少秒侯执行
        ctx.channel().eventLoop().schedule(new Runnable() 
            @Override
            public void run() 
                try 
                    Thread.sleep(5*1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到11111!",CharsetUtil.UTF_8));
                    System.out.println("channel code:"+ctx.channel().hashCode());
                 catch (InterruptedException e) 
                    e.printStackTrace();
                    System.out.println("发生异常:"+e.getStackTrace());
                

            
        ,5,TimeUnit.SECONDS);
        ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() 
            @Override
            public void run() 
                try 
                    Thread.sleep(3*1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到2222222",CharsetUtil.UTF_8));
                    System.out.println("channel code:"+ctx.channel().hashCode());
                 catch (InterruptedException e) 
                    e.printStackTrace();
                    System.out.println("发生异常:"+e.getStackTrace());
                

            
        ,5,10,TimeUnit.SECONDS);


    
    //数据读取完毕
    public void channelReadComplete(ChannelHandlerContext ctx)
        //将数据写入道缓存,并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到!",CharsetUtil.UTF_8));

    
    //处理异常,一般需要关闭通道
    public  void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
        ctx.close();
    


​

查看结果

Netty利用EventLoop实现调度任务执行_w3cschool

开发者涨薪指南 48位大咖的思考法则、工作方式、逻辑体系

以上是关于网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)的主要内容,如果未能解决你的问题,请参考以下文章

网络I/o编程模型10 netty介绍

网络I/o编程模型16 netty框架实现的群聊系统

网络I/o编程模型21 netty的粘包和拆包问题的解决方案

Netty网络编程第六卷

网络I/o编程模型14 netty的核心组件

网络I/o编程模型17 netty框架实现心跳机制