如何使用Netty打造工业级RPC框架

Posted tangtong1

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Netty打造工业级RPC框架相关的知识,希望对你有一定的参考价值。

如何使用Netty打造工业级RPC框架

郑重声明:此框架旨在理解rpc框架的本质,切勿直接在生产环境使用,否则后果自负。

rpc框架的几点疑问

如何通信?

两个服务之间的通信方式有很多,比如http、tcp、websocket、http2.0(stream),每种方式都有优秀和弊端的地方,比如http是短连接,高并发的时候不合适。

这里我们使用Netty来作为我们的传输层,Netty是对底层io的封装,本身就支持多种协议,扩展性好,性能棒,我们直接使用它的nio传输方式即可。

如何像调用本地服务一样无感?

首先,客户端与服务端需要有共同的接口,服务端实现业务逻辑,客户端需要有一个接口的实例,这个实例我们怎么构造呢?其实很简单,通过动态代理即可,动态代理的实现方式有很多,比如jdk proxy,javasist,cglib等,其中jdk proxy方便学习,所以我们这里直接使用jdk proxy作为我们的动态代理部分。

我们通过jdk proxy生成目标接口的一个实例,这实例内部封装接口调用的参数、类型等信息,通过传输层发送到远程服务端,远程服务端通过反射拿到具体的实现类,通过反射调用其对应的方法,拿到结果,再把结果封装通过传输层再传回客户端,客户端拿到结果展示即可。

客户端如何动态感知服务端?

通过zookeeper的监听机制,我们可以动态感知一个服务的上线下线,一个客户端可同时监听多个服务端,一个服务端下线了,重试其它服务端,我们今天的框架中先不写注册中心相关的东西。

整体流程


通过上图我们可以看到一个rpc框架的全貌,基本包括代理层、协议层、传输层等几个部分,其中协议层又包括编解码、序列化反序列化等部分,代理层又包括客户端的代理和服务端的代理(Invoker),下面我们一一来介绍并编写相关部分的代码。

支持层

这一层实际并不存在,是不在上图中的其它的一些对象的统称,你可以在看主体流程的过程中,回来查看相应的数据结构,这部分对象包括:

  • 实际传输的对象,RpcRequest、RpcResponse

RpcRequest.java

package com.coolcoding.rpc.model;

import com.alibaba.fastjson.JSON;

public class RpcRequest 
    // 唯一标识一次请求
    private Long id;
    // 服务名称
    private String serviceName;
    // 方法名称
    private String methodName;
    // 参数
    private Object[] params;
    // 参数类型
    private Class<?>[] paramTypes;

    public Long getId() 
        return id;
    

    public void setId(Long id) 
        this.id = id;
    

    public String getServiceName() 
        return serviceName;
    

    public void setServiceName(String serviceName) 
        this.serviceName = serviceName;
    

    public String getMethodName() 
        return methodName;
    

    public void setMethodName(String methodName) 
        this.methodName = methodName;
    

    public Class<?>[] getParamTypes() 
        return paramTypes;
    

    public void setParamTypes(Class<?>[] paramTypes) 
        this.paramTypes = paramTypes;
    

    public Object[] getParams() 
        return params;
    

    public void setParams(Object[] params) 
        this.params = params;
    

    @Override
    public String toString() 
        return JSON.toJSONString(this);
    


RpcResponse.java

package com.coolcoding.rpc.model;

import com.alibaba.fastjson.JSON;

public class RpcResponse 
    // 唯一标识一个请求,与RpcRequest中对应
    private Long id;
    // 方法调用的结果
    private Object response;
    // 异常信息
    private Throwable cause;

    public Long getId() 
        return id;
    

    public void setId(Long id) 
        this.id = id;
    

    public Object getResponse() 
        return response;
    

    public void setResponse(Object response) 
        this.response = response;
    

    public Throwable getCause() 
        return cause;
    

    public void setCause(Throwable cause) 
        this.cause = cause;
    

    @Override
    public String toString() 
        return JSON.toJSONString(this);
    


  • 参数类型的转换,ConvertUtils

ConvertUtils.java

package com.coolcoding.rpc.convert;

/**
 * 转换类型工具类
 * 因为json序列化是不带类型的,生产中建议使用Protobuf等序列化框架,double会被转成BigDecimal
 */
public class ConvertUtils 

    public static Object convert(Object value, Class<?> clz) 
        // 暂时只支持Double,且不支持小写
        if (clz == Double.class) 
            return new Double(value.toString());
        
        throw new UnsupportedTypeException();
    


  • 支持异步调用,RpcResponseFuture

RpcResponseFuture.java

package com.coolcoding.rpc.future;

import com.coolcoding.rpc.convert.ConvertUtils;
import com.coolcoding.rpc.model.RpcResponse;

import java.util.concurrent.BlockingQueue;

/**
 * 用于支持异步获取调用结果
 */
public class RpcResponseFuture 
    // 使用ThreadLocal保存当前线程的RpcResponseFuture
    private static final ThreadLocal<RpcResponseFuture> LOCAL = new ThreadLocal<>();

    // 一个队列
    private BlockingQueue<RpcResponse> queue;
    // 方法调用的返回类型
    private Class<?> returyType;

    public void setQueue(BlockingQueue<RpcResponse> queue) 
        this.queue = queue;
    

    public void setReturyType(Class<?> returyType) 
        this.returyType = returyType;
    

    public static RpcResponseFuture getRpcResponseFuture() 
        return LOCAL.get();
    

    // 创建一个RpcResponseFuture并把它设置到ThreadLocal中
    public static RpcResponseFuture newRpcResponseFuture() 
        RpcResponseFuture rpcResponseFuture = new RpcResponseFuture();
        LOCAL.set(rpcResponseFuture);
        return rpcResponseFuture;
    

    // 获取异步调用的结果
    public Object get() 
        if (queue != null) 
            try 
                RpcResponse rpcResponse = queue.take();
                if (rpcResponse.getCause() != null) 
                    throw rpcResponse.getCause();
                
                return ConvertUtils.convert(rpcResponse.getResponse(), returyType);
             catch (Throwable e) 
                e.printStackTrace();
            
        
        return null;
    


  • 保存服务

ServiceHolder.java

package com.coolcoding.rpc.invoker;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ServiceHolder 

    private static Map<String, Object> map = new ConcurrentHashMap<>();

    public static void publishService(Class<?> serviceClz, Object service) 
        map.put(serviceClz.getName(), service);
    

    public static Object getService(String serviceName) 
        return map.get(serviceName);
    


传输层

传输层即客户端与服务端之间的直接通信,使用Netty的话很方便,直接上代码。

  • 客户端代码

代码比较简单,都在注释里面了,这里主要是要把后面发送数据需要用到的channel保存下来。

NettyClient.java

package com.coolcoding.rpc.client;

import com.coolcoding.rpc.codec.RpcDecoder;
import com.coolcoding.rpc.codec.RpcEncoder;
import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.model.RpcResponse;
import com.coolcoding.rpc.serialize.SerializerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;

public class NettyClient 
   private Channel channel;

   // 发送rpc request
   public void sendRpcRequest(RpcRequest rpcRequest) throws Exception 
       try 
           // 使用channel直接写数据
           this.channel.writeAndFlush(rpcRequest).sync();
        catch (Exception e) 
           throw e;
       
   

   // 启动客户端,连接服务端
   public void start(String host, int port) 
       EventLoopGroup group = new NioEventLoopGroup();
       Bootstrap bootstrap = new Bootstrap();

       try 
           // 配置参数
           bootstrap.group(group)
                   .channel(NioSocketChannel.class)
                   .handler(new ChannelInitializer<SocketChannel>() 
                       @Override
                       public void initChannel(SocketChannel channel) throws Exception 
                           channel.pipeline()
                                   .addLast(new RpcEncoder(SerializerFactory.getSerializer()))
                                   .addLast(new RpcDecoder(SerializerFactory.getSerializer(), RpcResponse.class))
                                   .addLast(new NettyClientHandler());
                       
                   ).option(ChannelOption.TCP_NODELAY, true)
                   .option(ChannelOption.SO_REUSEADDR, true)
                   .option(ChannelOption.SO_KEEPALIVE, true);

           // 客户端连接到服务端
           this.channel = bootstrap.connect(host, port).sync().channel();

           System.out.println("conn to server success...");
        catch (Exception e) 
           e.printStackTrace();
        finally 
           // 这里不能把group关了,关了EventLoop就关了,后面发送请求发不出去了。
       
   


NettyClientHandle.java

package com.coolcoding.rpc.client;

import com.coolcoding.rpc.model.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 客户端收到响应的处理器
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> 

    @Override
    public void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception 
        // 取得发送数据时的绑定的队列,并把返回值放入进去
        Waiter.getQueue(msg.getId()).offer(msg);
        Waiter.removeQueue(msg.getId());
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    


  • 服务端代码

服务端主要是监听端口,设置收到消息的处理器NettyServerHandler。

package com.coolcoding.rpc.server;


import com.coolcoding.rpc.codec.RpcDecoder;
import com.coolcoding.rpc.codec.RpcEncoder;
import com.coolcoding.rpc.invoker.InvokerFactory;
import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.serialize.SerializerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer   

    private final int port;

    public NettyServer(int port) 
        this.port = port;
    

    // 启动服务端
    public void start() 
        // 接受连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理业务
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            // 配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception 
                            channel.pipeline()
                                    .addLast(new RpcDecoder(SerializerFactory.getSerializer(), RpcRequest.class))
                                    .addLast(new RpcEncoder(SerializerFactory.getSerializer()))
                                    .addLast(new NettyServerHandler(InvokerFactory.getInvoker()));
                        
                    ).option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,启动监听
            ChannelFuture future = bootstrap.bind(port).sync();

            System.out.println("server started...");

            // 这里会阻塞
            future.channel().closeFuture().sync();
         catch (Exception e) 
            e.printStackTrace();
         finally 
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        

    



NettyServerHandler.java

package com.coolcoding.rpc.server;

import com.coolcoding.rpc.model.RpcRequest;
import com.coolcoding.rpc.invoker.Invoker;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 服务端收到请求的处理器
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> 

    /**
     * 调用器
     */
    private Invoker invoker;

    public NettyServerHandler(Invoker invoker) 
        this.invoker = invoker;
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception 
        // 通过调用器调用到具体的服务进行处理
        ctx.writeAndFlush(invoker.invoke(msg));
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    



协议层

协议层主要是对请求响应的编解码、序列化等操作,当然,像dubbo那种复杂的框架还包含对传输协议等的封装。

编解码器

RpcEncoder.java

package com.coolcoding.rpc.codec;

import com.coolcoding.rpc.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
以上是关于如何使用Netty打造工业级RPC框架的主要内容,如果未能解决你的问题,请参考以下文章

RPC框架motan: 通信框架netty

搜狗开源 srpc,C++ 通用 RPC 框架!

#yyds干货盘点# 基于Netty,20分钟手写一个RPC框架

netty实现rpc框架

基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

大厨小鲜——基于Netty自己动手编写RPC框架