13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)相关的知识,希望对你有一定的参考价值。

【README】

  • 1.本文总结了 RPC 的概念,包括定义,RPC实现,及其优缺点;
  • 2.本文po出了RPC的简单代码实现,总结自 B站《netty-尚硅谷》;
  • 3.本文部分内容总结自 :What Is Remote Procedure Call (RPC)? Definition from SearchAppArchitecture
  • 4.本文还po出了 RPC 与 RMI 的区别;
  • 5.基于socket的rpc是阻塞式的,而基于netty的rpc是非阻塞式

【1】RPC 

【1.1】RPC定义

1)RPC定义: Remote Procedure Call, 远程过程调用协议

远程过程调用是一种软件通讯协议,基于该协议,一个程序可以通过网络调用另一台计算机上的服务而无需关心细节,就像调用本地系统的服务一样。

【补充】RMI:Remote Method Invocation,远程方法调用

  • RMI 能够让jvm上的客户端像调用本地方法一样,调用远程jvm服务器上对象的方法;
  • 显然, RMI是 RPC 的 java版本的实现

2)常见的 RPC 框架有:

  • 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

【1.2】RPC的实现

1)当触发远程过程调用,调用环境会被挂起,过程参数会通过网络传送到过程执行环境(过程参数可以理解为是类名,方法名,方法参数,方法参数类型等)。

2)当服务器过程执行完成,结果会被回传到调用环境;只要过程调用返回则客户端立即恢复执行;

3)RPC调用过程中,会经过如下步骤:

  • 步骤1:客户端调用客户端存根(代理);该调用是本地过程调用,并把参数压入栈;
  • 步骤2:客户端代理封装过程参数到报文,并发送报文;把过程参数打包称为 参数编组
  • 步骤3:客户端本地操作系统发送报文到远程服务器;
  • 步骤4:服务器操作系统把请求数据包送入 服务器存根(代理)
  • 步骤5:服务器代理把参数解包,这个过程称为 参数解组
  • 步骤6:当服务器过程调用完成,把结果返回给服务器代理;服务器代理把返回值封装到报文;服务器代理接着把报文传递给传输层
  • 步骤7:服务器传输层发送响应报文到客户端传输层,接着回传给客户端代理
  • 步骤8:客户端代理解组返回的参数,返回到调用者的执行点;

4)RPC 调用流程图

【图解】 在RPC中:

  • 1)客户端称为服务消费者;服务端称为服务提供者;
  • 2)编码是把对象转为字节数组(字节码),以便网络传输,即序列化
  • 3)解码是把字节数组(字节)转为对象,以便业务逻辑处理,即反序列化
  • 4)综上,RPC需要用到动态代理, 序列化与反序列化技术等;

 【1.3】RPC的优缺点

1)优点:

  • 在高级语言中,有助于客户端通过传统过程调用与服务器通讯;
  • 能够被用于分布式环境,远程调用就像本地调用一样;
  • 支持面向进程与面向线程模型;
  • 对用户隐藏了内部报文传输机制;
  • 仅需极少的工作即可重写和重新开发代码;
  • 删除了许多协议层以提高性能;

2)缺点:

  • 客户端与服务器各自使用不同执行环境,使用资源非常复杂,如文件。因此 RPC系统不适合传输大量数据;
  • RPC极易失败,因为它涉及到通讯系统,另一外机器和另一个处理过程;
  • RPC没有统一的标准,可以通过多种方式实现;
  • RPC 只是基于交互的,因此,它对硬件架构没有提供任何灵活性;

【2】RPC的Socket实现(阻塞式)

部分代码转自: 简单RPC之Socket实现_归田的博客-CSDN博客_rpc socket

0)目录结构:

【图解】RPC的实现代码包含3部分:

1)公共协议:

  • 约定的报文格式(封装了过程调用参数,包括全限定类名,方法名,参数类型数组,参数值数组),以便于服务端(服务提供方)根据反射创建对应类与调用方法
  • 约定的过程名(如方法名):把方法名封装到接口;

2)客户端(服务消费方):

  • 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文通过socket请求服务器获取调用结果;(客户端在整个调用过程是阻塞的,直到结果返回

3)服务端(服务提供方):

  • 根据报文中的 类名通过反射创建服务实例,根据方法名,参数类型,参数列表通过反射调用方法,并将结果回写到socket

注意(报文的序列化与反序列化):

  • 客户端把报文对象写出到 socket 前需要序列化为字节数组;服务器从socket读入字节数组后,需要反序列化为报文对象;

【2.1】协议报文与过程调用方法定义:

报文:

/**
 * @Description 报文(注意:要可序列化)
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月12日
 */
public class MyRpcMessage implements Externalizable 
    /** 全限定类名 */
    private String className;
    /** 方法名 */
    private String methodName;
    /** 参数类型clazz数组 */
    private Class<?>[] parameterTypes;
    /** 参数值 */
    private Object[] parameterValues;

    /**
     * @description 私有构造器
     * @author xiao tang
     * @date 2022/9/12
     */
    public MyRpcMessage() 
    

    /**
     * @description 创建生成器
     * @return 生成器
     * @author xiao tang
     * @date 2022/9/12
     */
    public static Builder builder() 
        Builder builder =  new Builder();
        return builder;
    

    public String getClassName() 
        return className;
    

    public void setClassName(String className) 
        this.className = className;
    

    public String getMethodName() 
        return methodName;
    

    public void setMethodName(String methodName) 
        this.methodName = methodName;
    

    public Class<?>[] getParameterTypes() 
        return parameterTypes;
    

    public void setParameterTypes(Class<?>[] parameterTypes) 
        this.parameterTypes = parameterTypes;
    

    public Object[] getParameterValues() 
        return parameterValues;
    

    public void setParameterValues(Object[] parameterValues) 
        this.parameterValues = parameterValues;
    

    @Override
    public void writeExternal(ObjectOutput out) throws IOException 
        out.writeUTF(className);
        out.writeUTF(methodName);
        out.writeObject(parameterTypes);
        out.writeObject(parameterValues);
    

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException 
        this.className = in.readUTF();
        this.methodName = in.readUTF();
        this.parameterTypes = (Class<?>[]) in.readObject();
        this.parameterValues = (Object[]) in.readObject();
    

    /**
     * @description 生成器
     * @author xiao tang
     * @date 2022/9/12
     */
    public static class Builder 
        /** rpc报文对象 */
        private MyRpcMessage myRpcMessage;

        /**
         * @description 私有构造器
         * @author xiao tang
         * @date 2022/9/12
         */
        private Builder() 
            this.myRpcMessage = new MyRpcMessage();
        

        /**
         * @description 设置全限定类名
         * @return 生成器
         * @author xiao tang
         * @date 2022/9/12
         */
        public Builder className(String className) 
            this.myRpcMessage.setClassName(className);
            return this;
        

        /**
         * @description 设置方法名
         * @return 生成器
         * @author xiao tang
         * @date 2022/9/12
         */
        public Builder methodName(String methodName) 
            this.myRpcMessage.setMethodName(methodName);
            return this;
        

        /**
         * @description 设置参数类型数组
         * @return 生成器
         * @author xiao tang
         * @date 2022/9/12
         */
        public Builder parameterTypes(Class<?>[] parameterTypes) 
            this.myRpcMessage.setParameterTypes(parameterTypes);
            return this;
        

        /**
         * @description 设置参数值数组
         * @return 生成器
         * @author xiao tang
         * @date 2022/9/12
         */
        public Builder parameterValues(Object[] parameterValues) 
            this.myRpcMessage.setParameterValues(parameterValues);
            return this;
        

        /**
         * @description 获得rpc报文对象
         * @return rpc报文对象
         * @author xiao tang
         * @date 2022/9/12
         */
        public MyRpcMessage build() 
            return myRpcMessage;
        
    

方法接口定义:

/**
 * @Description 过程调用接口
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public interface IHelloRpc 

    String hello(String name);

【2.2】服务器:

/**
 * @Description rpc服务器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyRpcSocketServer 
    /** 线程池 */
    private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();

    public static void main(String[] args) 
        try 
            startServer0(8089);
         catch (IOException e) 
            e.printStackTrace();
         finally 
            THREAD_POOL.shutdown();
        
    

    /**
     * @description 开启服务器
     * @param port 端口
     * @author xiao tang
     * @date 2022/9/11
     */
    private static void startServer0(int port) throws IOException 
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("服务器启动成功.");
        while(true) 
            try 
                // 阻塞直到有客户端请求
                Socket socket = serverSocket.accept();
                // 处理器请求
                THREAD_POOL.execute(new BusiTask(socket));
             catch (Exception e) 
                e.printStackTrace();
                System.out.println("抛出异常");
                throw e;
            
        
    

    /**
     * @description 业务作业,Runnable子类
     * @author xiao tang
     * @date 2022/9/11
     */
    static class BusiTask implements Runnable 
        private Socket socket;
        public BusiTask(Socket socket) 
            this.socket = socket;
        
        @Override
        public void run() 
            // 获得字节输入输出流
            try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                    ObjectOutputStream outputMsg = new ObjectOutputStream(socket.getOutputStream());) 
                // 获取过程参数(报文),包括类名,方法名,方法参数类型,方法参数列表
                MyRpcMessage inputMsg = (MyRpcMessage)inputStream.readObject();
                // 通过反射创建对象
                Class<?> clazz = Class.forName(inputMsg.getClassName());
                Object procedureObj = clazz.newInstance();
//                // 通过反射创建方法
                Method method = clazz.getDeclaredMethod(inputMsg.getMethodName(), inputMsg.getParameterTypes());
//                // 通过反射调用方法
                Object result = method.invoke(procedureObj, inputMsg.getParameterValues());
                // 把结果回写
                outputMsg.writeObject(result);
             catch (Exception e) 
                e.printStackTrace();
             finally 
                try 
                    socket.close();
                 catch (IOException e) 
                    e.printStackTrace();
                
            
        
    

过程调用接口实现类:

/**
 * @Description 过程调用接口实现
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class IHelloRpcServerImpl implements IHelloRpc 
    @Override
    public String hello(String name) 
        return "hello " + name + ", nice to meet you, i am a server named RPC.";
    

【2.3】客户端:

/**
 * @Description RPC存根(代理)工厂
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyRpcProxyFactory 

    /**
     * @description 私有构造器
     * @author xiao tang
     * @date 2022/9/11
     */
    private MyRpcProxyFactory() 
    

    /**
     * @description 创建代理对象
     * @param targetClazz 目标对象clazz
     * @param className 类名
     * @return 代理对象
     * @author xiao tang
     * @date 2022/9/11
     */
    public static <T> T newProxy(Class<T> targetClazz, String className) 
        return (T) Proxy.newProxyInstance(
                targetClazz.getClassLoader()
                , new Class<?>[]targetClazz
                , (Object proxy, Method method, Object[] args) ->
                        Socket socket = new Socket("localhost",8089);
                        // 获取输出流
                        try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                            ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) 
                            // 写出过程参数到服务器,包括类名,方法名,方法参数类型,方法参数列表
                            MyRpcMessage message = MyRpcMessage.builder()
                                                    .className(className)
                                                    .methodName(method.getName())
                                                    .parameterTypes(method.getParameterTypes())
                                                    .parameterValues(args)
                                                    .build();
                            outputStream.writeObject(message);
                            outputStream.flush();

                            // 获取输入流
                            Object result = inputStream.readObject();
                            if (result instanceof Throwable) 
                                throw (Throwable)result;
                            
                            return result;
                         finally 
                            socket.close();
                        
                );
    

测试用例入口:

public class MyRpcSocketClient 

    public static void main(String[] args) 
        IHelloRpc helloRpc = MyRpcProxyFactory.newProxy(IHelloRpc.class, "com.my.netty.rpc.socket.server.IHelloRpcServerImpl");
        String result = helloRpc.hello("成都");
        System.out.println(result);
    

【2.4】运行效果:

hello 成都, nice to meet you, i am a server named RPC.

【3】RPC的netty 实现

1)背景: 基于socket的rpc实现时阻塞式的,吞吐量不高; 本文尝试使用netty 实现rpc ;

2)netyt优势:

  • netty是非阻塞的,能够很好利用多核cpu的算力,增大系统吞吐量;
  • netty底层采用零拷贝,报文通讯性能更优;

3)代码目录:

【图解】 代码还是分为3部分(常量不算)

1)公共协议:

  • 把过程方法抽象为接口,客户端与服务器以该接口进行通讯;

2)客户端:

  • 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文(一个简单的字符串)通过 netty非阻塞式连接服务器,并把报文写出到channel,送入服务器(客户端在整个调用过程是非阻塞的);
  • 当前线程睡眠直到服务端的响应报文回写到客户端;

3)服务端:

  • 服务器在接收到客户端请求报文后,若协议报文头以 HelloServer#hello# 开头,则调用预设  HelloService.hello() 方法;
  • 调用hello() 方法完成后,把结果回写到netty的channel通道
  • 补充:为了简化编程,基于netty的rpc实现没有使用反射来创建类和调用方法,而是写死 方法;当然了,像上文的socket版本的rpc一样,可以把类名,方法名,参数类型,参数值列表等过程参数封装到报文头,服务器可以根据上述报文头通过反射来创建类和调用方法

【3.1】公共协议(约定)

1)公共接口;

/**
 * @Description 公共接口,服务提供方,服务消费方都需要
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public interface HelloService 

    String hello(String msg);

常量:

public class MyDubboConst 
    /** 协议头 */
    public static final String PROTOCOL_HEAD = "HelloServer#hello#";

【3.2】 客户端

1)netty客户端代理:

/**
 * @Description 客户端-服务消费者
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyDubboNettyClient 
    // 创建线程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    // 客户端处理器
    private static MyDubboNettyClientHandler clientHandler;

    private int counter = 0;

    // 编写方法,使用代理模式,获取代理对象
    public Object getBean(final Class<?> serviceClass, final String protocolHead) 
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]serviceClass, (Object proxy, Method method, Object[] args)->
                    System.out.println("Object proxy, Method method, Object[] args 进入了,第 " + (++counter) + " 次进入");
                    // 下面的代码, 客户端每调用一次hello,就会进入到该代码
                    if (clientHandler == null) 
                        initClientHandler(); // 初始化客户端处理器
                    
                    // 设置要发送给服务器的信息
                    // protocolHead 协议头部
                    // args[0] 就是客户端调用 hello的参数
                    System.out.println("clientHandler.setParameter");
                    clientHandler.setParameter(protocolHead + args[0]);
                    System.out.println("executorService.submit");
                    return executorService.submit(clientHandler).get();
                );
    

    // 初始化客户端处理器
    private static void initClientHandler() 
        clientHandler = new MyDubboNettyClientHandler();
        // 创建 EventLoopGroup
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NiosocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(clientHandler);
                    
                );
        // 启动客户端
        ChannelFuture channelFuture = null;
        try 
            channelFuture = bootstrap.connect("127.0.0.1", 8089).sync();
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println("客户端启动成功");
    

2)netty客户端业务逻辑处理器

/**
 * @Description dubbo客户端处理器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyDubboNettyClientHandler extends ChannelInboundHandlerAdapter implements Callable 

    private ChannelHandlerContext context; // 上下文
    private String result; // 返回结果
    private String parameter; // 客户端调用方法时,传入的参数

    // 与服务器连接创建成功后,就会被调用 (第1个被调用 )
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("MyDubboNettyClientHandler.channelActive");
        context = ctx; // 因为在其他方法也会用到这个 context
    

    // 收到服务器的数据后,就会调用方法
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("MyDubboNettyClientHandler.channelRead");
        result = msg.toString();
        notify(); // 唤醒等待的线程
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        System.out.println("MyDubboNettyClientHandler.exceptionCaught");
        ctx.close();
    

    // 被代理对象调用, 发送数据给服务器 -> 然后 wait -> 等待被唤醒  -> 返回结果 (第3个被调用)
    @Override
    public synchronized Object call() throws Exception 
        System.out.println("发送数据到服务器, parameter = " + parameter);
        context.writeAndFlush(parameter); // 发送数据给服务器
        System.out.println("发送数据到服务成功");
        wait(); // 等待直到服务器返回结果,服务器返回后调用 channelRead 方法来唤醒;
        System.out.println("after wait().");
        return result; // 服务提供方返回的结果
    
    // 第2个被调用,设置参数
    public void setParameter(String parameter) 
        System.out.println("setParameter");
        this.parameter = parameter;
    

3)客户端启动引导程序:

public class MyDubboClientBootstrap 

    public static void main(String[] args) 
        // 创建访问消费者-netty客户端
        MyDubboNettyClient customer = new MyDubboNettyClient();
        // 创建代理对象
        HelloService helloService = (HelloService)customer.getBean(HelloService.class, MyDubboConst.PROTOCOL_HEAD);

        // 通过代理对象调用服务提供者的服务
        String result = helloService.hello("hello服务器");
        System.out.println("服务器响应结果 = " + result);
    

【3.3】服务器

1)过程调用(方法调用)的具体实现

/**
 * @Description 服务提供方实现接口
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class HelloServiceImpl implements HelloService 
    private int counter;
    // 当有消费方调用该方法时,就返回一个字符串
    @Override
    public String hello(String msg) 
        System.out.println("收到客户端消息 = " + msg);
        // 根据msg 返回不同结果
        if (msg != null) 
            return "你好客户端,我收到你的消息 ["+msg+"]这是第" + (++counter) + "次收到消息";
         else 
            return "你好客户端,我收到的消息为空" ;
        
    

2)netty服务器

/**
 * @Description 基于netty的 Dubbo 服务器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyDubboNettyServer 

    public static void startServer(String hostName, int port) throws InterruptedException 
        startServer0(hostName, port);
    

    private static void startServer0(String hostName, int port) throws InterruptedException 
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception 
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new MyDubboNettyServerHandler()) ; // 自定义业务处理器
                        
                    );
            // 启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
            System.out.println("服务提供方启动成功,开始提供服务"); 
            // 监听关闭
            channelFuture.channel().closeFuture().sync();
         finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    

3)netty服务器业务处理器

/**
 * @Description 基于netty的 Dubbo 服务器处理器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月11日
 */
public class MyDubboNettyServerHandler extends ChannelInboundHandlerAdapter 

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
        // 获取客户端发送的消息 并 调用公共接口服务
        System.out.println("接收的msg = " + msg);
        // 客户端在调用服务器的api时,我们需要定义一个协议
        // 如 msg 以 "HelloServer#hello#" 开头,才调用api
        String msgString = msg.toString();
        if (msgString.startsWith(MyDubboConst.PROTOCOL_HEAD)) 
            String result = new HelloServiceImpl().hello(msgString.substring(msgString.lastIndexOf("#")+1));
            ctx.writeAndFlush(result);
         else 
            ctx.writeAndFlush("协议头非法");
        
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

4)服务器启动引导程序:

public class MyDubboServerBootstrap 
    public static void main(String[] args) throws InterruptedException 
        // 代码代填
        MyDubboNettyServer.startServer("127.0.0.1", 8089);
    

【3.4】运行效果:

1)服务器日志:

服务提供方启动成功,开始提供服务
[DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
[DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
[DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
[DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
[DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
[DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@e62edfd
接收的msg = HelloServer#hello#hello服务器
收到客户端消息 = hello服务器

2)客户端日志:

客户端启动成功
clientHandler.setParameter
MyDubboNettyClientHandler.channelActive
setParameter
executorService.submit
发送数据到服务器, parameter = HelloServer#hello#hello服务器
[DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
[DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
[DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
[DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
发送数据到服务成功
[DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
[DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@89e890f
MyDubboNettyClientHandler.channelRead
after wait().
服务器响应结果 = 你好客户端,我收到你的消息 [hello服务器]这是第1次收到消息

 

以上是关于13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)的主要内容,如果未能解决你的问题,请参考以下文章

socket-demo的实现

Java NIO实现非阻塞式socket通信

socket 客户端编程:非阻塞式连接,错误判断及退出重连

阻塞式/非阻塞式与同步/异步的区别

非阻塞式socket的select()用法

什么是socket网络编程