手写一个RPC框架

Posted wen-pan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写一个RPC框架相关的知识,希望对你有一定的参考价值。

代码地址https://gitee.com/mr_wenpan/basis-simple-rpc

一、项目介绍

①、项目基础介绍

【 simple-rpc】是一款简单的rpc服务框架,主要用于学习造轮子!以便于透彻的理解RPC原理和过程,以及spring、springboot、netty中相关技术运用。

【 simple-rpc】主要实现了两个版本

  • simple-rpc-like-feign分支】实现了类似于openfeign调用相关功能(openFeign底层通信是基于HTTP,一般是使用HttpClient等相关HTTP客户端发起的调用(这也是openfeign比较低效的原因之一)。在simple-rpc-like-feign分支中将底层通信替换为了netty)。但在使用方式上和openfeign非常相似,使用上非常便捷。
  • master分支】实现了类似dubbo rpc相关功能(底层通信基于netty),使用上和dubbo比较类似。
  • 下面主要介绍master分支实现的功能!!!

②、项目核心功能说明

  • 该项目实现了一个简单rpc调用,使用上类似于dubbo,非常方便,只需要基于一个[注解 + 接口]就能完成远程调用。
  • 以nacos作为注册中心(后面可拓展为支持多种类型的注册中心)
  • 消息序列化算法可由使用方动态配置(目前支持Java、JSON两种序列化算法)
  • 底层基于netty实现【服务提供方】和【服务消费方】进行通信,并且使用自定义消息编码解码器(MessageCodecSharableProtocolFrameDecoder)解决消息粘包和半包问题
    • 自定义rpc请求消息和rpc响应消息的编码解码器LengthFieldBasedFrameDecoder,自定义消息发送的格式。
    • 自定义帧解码器(预设长度解码器)ProtocolFrameDecoder,按自定义规则解码入站消息。
  • 基于netty提供的promise进行异步rpc调用,调用方并不用同步等待结果响应
  • 动态可插拔,基于@EnableSimpleRpcClients注解开启或关闭simple rpc功能
  • 在调用方只需要使用@SimpleRpcClientReference注解注入远程rpc接口,就可以进行RPC调用(像调用本地方法一样去进行远程调用)
  • server端,在底层netty通信时提供了RpcRequestMessageHandlerExecutor线程池,支持用户动态配置或覆盖默认线程池,提升通信channel的效率

二、启动步骤

首先要搞清楚如何将项目先正常跑起来,测试一把然后再去关注具体实现!

  • 拉取项目到本地,并且在idea中打开https://gitee.com/mr_wenpan/basis-simple-rpc
  • simple-rpc-starter模块执行mvn clean install到本地maven仓库
  • simple-rpc-providersimple-rpc-consumer这两个测试模块的application.yml配置文件中开启相关配置
    • 主要配置simple rpc server端监听的端口(默认8888端口)
    • rpc调用最大等待时间(默认1分钟)
    • nacos注册中心地址
  • simple-rpc-providersimple-rpc-consumer模块启动类上使用@EnableSimpleRpcClients注解开启simple rpc功能
  • 启动simple-rpc-provider模块
  • 启动simple-rpc-consumer模块
  • 浏览器请求接口:http://localhost:8081/v1/consumer-hello?name=xxxx
  • 观察浏览器返回和两个服务的控制台输出

当然上面的步骤在工程源代码里已经配置好了,只需要把代码拉下来,然后将nacos地址替换掉,直接启动运行就可以了。

三、RPC简单介绍

大致流程

时序图

四、Simple Rpc整体结构

五服务提供方和服务消费方启动流程简述

①、服务提供方启动流程简述

②、服务消费方启动流程简述

六、核心代码

1、客户端代理对象创建

public class SimpleRpcClientReferencePostProcessor implements BeanPostProcessor 

    private static final Map<Class<?>, Object> SIMPLE_RPC_REFERENCE_MAP = new ConcurrentHashMap<>();

    /**
     * bean初始化之前
     *
     * @param bean     bean
     * @param beanName bean名称
     * @return java.lang.Object
     * @author Mr_wenpan@163.com 2022/1/24 12:47 下午
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException 
        // 扫描含有@SimpleRpcClientReference注解的类
        Class<?> objClz = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
        try 
            for (Field field : objClz.getDeclaredFields()) 
                SimpleRpcClientReference reference = field.getAnnotation(SimpleRpcClientReference.class);
                if (Objects.isNull(reference)) 
                    continue;
                
                // 创建代理对象
                Object proxyService = getProxyService(field.getType());
                // 反射设置属性值
                field.setAccessible(true);
                ReflectionUtils.setField(field, bean, proxyService);
            
         catch (Exception e) 
            throw new BeanCreationException(beanName, e);
        
        return bean;
    

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException 

        return bean;
    

    /**
     * 获取代理对象,先从缓存中获取,如果本地缓存中没有则创建一个代理对象并返回
     *
     * @param type 类型
     * @return java.lang.Object 代理对象
     * @author Mr_wenpan@163.com 2022/1/24 4:05 下午
     */
    private static Object getProxyService(Class<?> type) 
        Assert.isTrue(type.isInterface(), "@SimpleRpcClientReference can only be specified on interface");
        // double check
        Object target = SIMPLE_RPC_REFERENCE_MAP.get(type);

        if (target != null) 
            return target;
        

        synchronized (SimpleRpcClientReferencePostProcessor.class) 
            target = SIMPLE_RPC_REFERENCE_MAP.get(type);
            if (target != null) 
                return target;
            
            target = SimpleRpcClientProxyCreateFactory.createProxyService(type);
            SIMPLE_RPC_REFERENCE_MAP.put(type, target);
            return target;
        

    


@Slf4j
public class SimpleRpcClientProxyCreateFactory 

    /**
     * 通过接口的class创建该接口的代理对象(这里直接基于JDK提供的创建动态代理的工具来创建代理对象)
     *
     * @param serviceClass 接口的class
     * @return T 代理对象
     */
    public static <T> T createProxyService(Class<T> serviceClass) 
        // 该接口的Class对象是被那个类加载器加载的
        ClassLoader classLoader = serviceClass.getClassLoader();
        // 获取到该接口所有的interface(这里先假设就只有一个接口)
        Class<?>[] interfaces = serviceClass;

        // jdk代理必须的handler,代理对象的方法执行就会调用这里的invoke方法。自动传入调用的方法 + 方法参数
        InvocationHandler invocationHandler = new InvocationHandler() 
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
                // 1、将方法调用转换为rpc请求消息(sequenceId为消息唯一编号,当请求响应时可以通过这个ID找到对应的等待的Promise,然后唤醒)
                int sequenceId = SequenceIdGenerator.nextId();
                // 封装RPC请求消息
                RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(sequenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args);

                // 2、将消息对象发送出去(这里channel不会阻塞等待消息返回)
                Channel channel = getChannelByProviderInfName(serviceClass.getName());
                channel.writeAndFlush(rpcRequestMessage);
                System.out.println("channel.writeAndFlush(rpcRequestMessage);");

                // 3、准备一个空的promise对象来接收server返回的结果
                // 指定promise对象异步接收结果的线程,这里使用发送消息的channel的线程来接收消息(getChannel().eventLoop())
                DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());

                // 将这个promise缓存起来,以便于server响应结果回来的时候能够通过消息的sequenceId正确的找到这个promise
                // 那么这个promise在哪里接收的结果呢?当然是在响应handler里,因为只有当server端响应了promise才应该被设置值
                RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

                // 调用接口的线程等待,直到promise有结果(正常或异常)
                SimpleRpcProperties simpleRpcProperties = ApplicationContextHelper.getContext().getBean(SimpleRpcProperties.class);
                promise.await(simpleRpcProperties.getMaxWaitTime() * 1000);

                System.out.println("promise结果已返回, promise = " + promise.toString());
                // server返回结果后结束上面的阻塞,执行这里
                if (promise.isSuccess()) 
                    return promise.getNow();
                 else 
                    throw new SimpleRpcCallFailedException(String.format(
                            "simple rpc call failed, interface providerName is : [%s]", serviceClass.getName()), promise.cause());
                
            
        ;

        Object proxy = Proxy.newProxyInstance(classLoader, interfaces, invocationHandler);

        // 返回代理对象
        return (T) proxy;
    

    /**
     * 同步锁对象
     */
    private static final Object LOCK = new Object();

    /**
     * 获取唯一的 channel 对象(需要通过这个channel将数据发送给server端)
     */
    public static Channel getChannelByProviderInfName(@NonNull String interfaceName) throws NacosException 
        // 从nacos上随机获取一个可用的服务提供者实例
        NacosRegistrarManager nacosRegistrarManager = ApplicationContextHelper.getContext().getBean(NacosRegistrarManager.class);
        Instance instance = nacosRegistrarManager.getRandomInstanceByServerName(interfaceName);
        if (Objects.isNull(instance)) 
            throw new NoInstancesAvailableException(String.format("can not found available instance by service name [%s]", interfaceName));
        
        // [服务名 + ip + 端口] 确定provider的唯一性
        String ip = instance.getIp();
        int port = instance.getPort();
        String serviceName = instance.getServiceName();
        String uniqueKey = serviceName + ip + SimpleRpcConstants.Symbol.COLON + port;
        Channel channel = SimpleRpcServerChannelRegistrar.getChannel(uniqueKey);

        if (Objects.nonNull(channel)) 
            return channel;
        

        return initProviderChannel(serviceName, ip, port);
    

    /**
     * 初始化到provider 的 channel
     *
     * @param serviceName serviceName
     * @param ip          provider的ip
     * @param port        provider监听的端口
     * @return io.netty.channel.Channel
     * @author Mr_wenpan@163.com 2022/1/21 11:51 上午
     */
    private static Channel initProviderChannel(String serviceName, String ip, int port) 
        // 唯一key
        String uniqueKey = serviceName + ip + SimpleRpcConstants.Symbol.COLON + port;
        Channel channel = SimpleRpcServerChannelRegistrar.getChannel(uniqueKey);
        // 双重检查
        if (Objects.nonNull(channel)) 
            return channel;
        
        synchronized (LOCK) 
            // 通过uniqueKey获取到某个provider的channel
            channel = SimpleRpcServerChannelRegistrar.getChannel(uniqueKey);
            if (Objects.nonNull(channel)) 
                return channel;
            
            // 创建到该provider的一个新channel并缓存起来
            // 客户端事件线程池组
            NioEventLoopGroup group = new NioEventLoopGroup();
            // 日志处理handler
            LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
            // 自定义编码解码器
            MessageCodecSharable messageCodec = new MessageCodecSharable();
            // rpc调用响应消息处理handler
            RpcResponseMessageHandler rpcHandler = new RpcResponseMessageHandler();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NiosocketChannel.class);
            bootstrap.group(group);

            // 绑定handler
            bootstrap.handler(new ChannelInitializer<SocketChannel>() 
                // 建立连接后为该client channel添加handler
                @Override
                protected void initChannel(SocketChannel ch) throws Exception 
                    // 自定义帧解码器
                    ch.pipeline().addLast(new ProtocolFrameDecoder());
                    // 日志处理器
                    // ch.pipeline().addLast(loggingHandler);
                    // 自定义编码解码器(按自定义格式将消息解码,然后传递给下一个handler)
                    ch.pipeline().addLast(messageCodec);
                    // rpc 调用响应消息处理handler
                    ch.pipeline().addLast(rpcHandler);
                
            );

            try 
                // 通过provider的ip + 端口,发起连接并注册到本地缓存
                channel = bootstrap.connect(ip, port).sync().channel();
                SimpleRpcServerChannelRegistrar.registerChannel(uniqueKey, channel);
                // 注册channel关闭监听事件
                channel.closeFuture().addListener(future -> 
                    // help gc and reconnect
                    SimpleRpcServerChannelRegistrar.removeChannel(uniqueKey);
                    group.shutdownGracefully();
                    log.warn("server channel shutdown, shutdown [NioEventLoopGroup] gracefully now.");
                );
             catch (Exception e) 
                throw new SimpleRpcChannelException("create simple rpc provider channel occur excetion.", e);
            
            return channel;
        
    

2、server端接口暴露到注册中心

public class SimpleRpcServerExposeRunner implements CommandLineRunner 

    /**
     * 暴露的接口信息缓存
     */
    private final static Set<String> INTERFACE_REGISTER_INFO_SET = new ConcurrentHashSet<>();

    private final AtomicBoolean isInit = new AtomicBoolean(false);

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final SimpleRpcProperties simpleRpcProperties;

    private final NacosRegistrarManager nacosRegistrarManager;

    public SimpleRpcServerExposeRunner(SimpleRpcProperties simpleRpcProperties,
                                       NacosRegistrarManager nacosRegistrarManager) 
        this.simpleRpcProperties = simpleRpcProperties;
        this.nacosRegistrarManager = nacosRegistrarManager;
    

    @Override
    public void run(String... args) throws Exception 
        // 防止并发
        if (!isInit.compareAndSet(false, true)) 
            logger.info("nacos register has been is init...");
            return;
        

        // 扫描到标注有@SimpleRpcServerExpose注解的类
        ApplicationContext context = ApplicationContextHelper.getContext();
        Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(SimpleRpcServerExpose.class);

        String currentHostIp = NetworkUtil.localIp();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(currentHostIp, simpleRpcProperties.getSimpleRpcServerPort());

        // 接口暴露到注册中心
        beansWithAnnotation.forEach((key, value) -> 
            Class<?>[] interfaces = value.getClass().getInterfaces();
            if (!Objects.isNull(interfaces)) 
                // 将接口注册到nacos(以接口全限定名为服务名)
                try 
                    for (Class<?> anInterface : interfaces) 
                        String interfaceName = anInterface.getName();
                        INTERFACE_REGISTER_INFO_SET.add(interfaceName);
                        nacosRegistrarManager.registerServer(interfaceName, inetSocketAddress);
                    
                 catch (NacosException e) 
                    throw new RuntimeException("can not register to nacos, occur exception.", e);
                
            
        );

        // 添加钩子,关闭时清理注册信息
        Runtime.getRuntime().addShutdownHook(new Thread(() -> 
            INTERFACE_REGISTER_INFO_SET.forEach(interfaceName -> 
                try 
                    nacosRegistrarManager.clearRegister(interfaceName);
                 catch (Exception ex) 
                    logger.error("hook clear register info from nacos occur exception.");
                
            );
        ));
    

    /**
     * 获取到该实例所有已经暴露的接口
     */
    public 手写一个RPC框架

手写一个自己的 RPC 框架?

手写一个简单的RPC框架

花了2个月手写一个RPC框架!

是时候手写一个RPC框架了

趁热打铁,我们今天来手写一个RPC框架……