如何写一个 RPC 框架

Posted sp42a

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何写一个 RPC 框架相关的知识,希望对你有一定的参考价值。

开始造轮子之旅, 本期轮子:RPC 框架。在后续一段时间里, 我会写一系列文章来讲述如何实现一个 RPC 框架(我已经实现了一个示例框架, 代码在我的 github上)。 这是系列第一篇文章, 主要从整体角度讲述了一个 RPC 框架组成结构与关注点。

RPC框架的关注点

首先,什么是 RPC?RPC的全称是 Remote Procedure Call,远程过程调用。RPC 框架有很多,比如 hsf、dubbo 等等。借助 RPC 框架,我们在写业务代码的时候可以不需要去考虑服务之间的通信等问题,在调用远程服务的时候就像调用本地的方法那么简单。

那么,要写一个 RPC 框架应该由哪些部分组成,关注哪些东西?

简化本地调用流程

既然我们要像调用本地方法那样调用远程服务, 那么就应该生成代理来隐藏调用远程服务的细节。 这些细节包括但不限于以下所列出的关注点。

服务发现与服务注册

如果我们想在 Service A 中调用 Service B,那么我们首先得知道 Service B 的地址。 所以,我们需要有一个服务注册中心,通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息。客户端也需要watch服务注册中心的目标服务的地址的变化。

网络通信

  • 服务和服务之间的网络通信模型, NIO/IO 等等
  • 客户端如何复用与服务端的连接, 而不是每次请求都重新创建一个新连接?
  • 客户端收到返回后,如何知道是哪个请求的返回并且做出正确处理?

消息的序列化

服务间通信的消息通过什么方式进行序列化? hessian,XML、JSON、Protobuf、……, 甚至 Java 原生的序列化方式, 你总得选择一个。

负载均衡

客户端通过服务注册中心拿到一堆地址,该调哪个呢?最简单的方式,可以通过 RR、WRR 的方式去做 LB。如果做得更深入一些,可以从以下角度去优化:

  • 根据服务实例的 metrics 做出动态调整, 比如响应时间等
  • 利用一致性哈希, 提高本地缓存利用率

容灾

  • 健康监测: 在某一个服务节点挂掉的时候, 如何在服务注册中心删去这个服务地址?
  • 服务调用超时与重试: 在调用一个服务实例的时候,如果超时或者报错,怎么处理?
  • 服务限流:如何限制最大并发数?这个又可以从客户端和服务端两个角度分析。

利用 Bean 容器和动态代理简化客户端代码

在本系列第一篇文章中,我们说到了 RPC 框架需要关注的第一个点,通过创建代理的方式来简化客户端代码。

如果不使用代理?

如果我们不用代理去帮我们操心那些服务寻址、网络通信的问题,我们的代码会怎样?我们每调用一次远端服务,就要在业务代码中重复一遍那些复杂的逻辑,这肯定是不能接受的!

目标代码

而我们的目标是写出简洁的代码,就像这样:

// 这个接口应该被单独打成一个jar包,同时被server和client所依赖
@RPCService(HelloService.class)
public interface HelloService 

    String hello(String name);


@Component
@Slf4j
public class AnotherService 
    @Autowired
    HelloService helloService;

    public void callHelloService() 
        //就像调用本地方法一样自如!
        log.info("Result of callHelloService: ", helloService.hello("world"));
    


@EnableRPCClients(basePackages = "pw.hshen.hrpc")
public class HelloClient 

    public static void main(String[] args) throws Exception 
        ApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
        AnotherService anotherService = context.getBean(AnotherService.class);
        anotherService.callHelloService();
    

代码中的 AnotherService 可以简单调用远端的 HelloService 的方法,就像调用本地的 service 一样简单! 在这段代码中,HelloService 可以视作 server, 而 AnotherService 则是它的调用者,可以视作是 client。

实现思路

获取要被创建代理的接口

首先,我们要知道需要为哪些接口来创建代理。

我们需要为这种特殊的接口创建一个注解来标注,即 RPCService。然后我们就可以通过扫描某个包下面所有包含这个注解的interface来获取了。那么,怎么知道要扫描哪个包呢?方法就是获取 MainClass 的 EnableRPCClients 注解的 basePackages 的值。

为这些接口创建动态代理

我们可以利用 jdk 的动态代理来做这件事儿:

// Interface是需要被创建代理的那个接口
Proxy.newProxyInstance(
            interface.getClassLoader(),
            new Class<?>[]interface,
            new InvocationHandler() 
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
            // TODO: Do RPC action here and return the result 
            

将创建出来的代理对象注册到 bean 容器中

关于如何动态向 spring 容器中注册自定义的 bean, 可以参考这篇文章。在我的框架中, 我选择了使用 BeanDefinitionRegistryPostProcessor 所提供的 hook。

注入到 bean 容器之后,我们就可以在代码中愉快的用 Autowired 等注解来获取所创建的代理啦!

完整代码

定义需要的注解

/**
 * @author hongbin
 * Created on 22/10/2017
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableRPCClients 
    String[] basePackages() default ;

/**
 * @author hongbin
 * Created on 21/10/2017
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
@Inherited
public @interface RPCService 

    Class<?> value();

利用 Spring 的 hook 机制, 向容器中注册我们自己的 proxy bean:

/**
 * Register proxy bean for required client in bean container.
 * 1. Get interfaces with annotation RPCService
 * 2. Create proxy bean for the interfaces and register them
 *
 * @author hongbin
 * Created on 21/10/2017
 */
@Slf4j
@RequiredArgsConstructor
public class ServiceProxyProvider extends PropertySourcesPlaceholderConfigurer implements BeanDefinitionRegistryPostProcessor 

    @NonNull
    private ServiceDiscovery serviceDiscovery;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException 
        log.info("register beans");
        ClassPathScanningCandidateComponentProvider scanner = getScanner();
        scanner.addIncludeFilter(new AnnotationTypeFilter(RPCService.class));

        for (String basePackage: getBasePackages()) 
            Set<BeanDefinition> candidateComponents = scanner
                    .findCandidateComponents(basePackage);
            for (BeanDefinition candidateComponent : candidateComponents) 
                if (candidateComponent instanceof AnnotatedBeanDefinition) 
                    AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                    AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();

                    BeanDefinitionHolder holder = createBeanDefinition(annotationMetadata);
                    BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
                
            
        
    

    private ClassPathScanningCandidateComponentProvider getScanner() 
        return new ClassPathScanningCandidateComponentProvider(false) 

            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) 
                if (beanDefinition.getMetadata().isIndependent()) 

                    if (beanDefinition.getMetadata().isInterface()
                            && beanDefinition.getMetadata().getInterfaceNames().length == 1
                            && Annotation.class.getName().equals(beanDefinition.getMetadata().getInterfaceNames()[0])) 

                        try 
                            Class<?> target = Class.forName(beanDefinition.getMetadata().getClassName());
                            return !target.isAnnotation();
                         catch (Exception ex) 

                            log.error("Could not load target class: , ",
                                    beanDefinition.getMetadata().getClassName(), ex);
                        
                    
                    return true;
                
                return false;
            
        ;
    

    private BeanDefinitionHolder createBeanDefinition(AnnotationMetadata annotationMetadata) 
        String className = annotationMetadata.getClassName();
        log.info("Creating bean definition for class: ", className);

        BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(ProxyFactoryBean.class);
        String beanName = StringUtils.uncapitalize(className.substring(className.lastIndexOf('.') + 1));

        definition.addPropertyValue("type", className);
        definition.addPropertyValue("serviceDiscovery", serviceDiscovery);

        return new BeanDefinitionHolder(definition.getBeanDefinition(), beanName);
    

    private Set<String> getBasePackages() 
        String[] basePackages = getMainClass().getAnnotation(EnableRPCClients.class).basePackages();
        Set set = new HashSet<>();
        Collections.addAll(set, basePackages);
        return set;
    

    private Class<?> getMainClass() 
        for (final Map.Entry<String, String> entry : System.getenv().entrySet()) 
            if (entry.getKey().startsWith("JAVA_MAIN_CLASS")) 
                String mainClass = entry.getValue();
                log.debug("Main class: ", mainClass);
                try 
                    return Class.forName(mainClass);
                 catch (ClassNotFoundException e) 
                    throw new IllegalStateException("Cannot determine main class.");
                
            
        
        throw new IllegalStateException("Cannot determine main class.");
    

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 

    

对应的 ProxyBeanFactory:

/**
 * FactoryBean for service proxy
 *
 * @author hongbin
 * Created on 24/10/2017
 */
@Slf4j
@Data
public class ProxyFactoryBean implements FactoryBean<Object> 
    private Class<?> type;

    private ServiceDiscovery serviceDiscovery;

    @SuppressWarnings("unchecked")
    @Override
    public Object getObject() throws Exception 
        return Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]type, this::doInvoke);
    

    @Override
    public Class<?> getObjectType() 
        return this.type;
    

    @Override
    public boolean isSingleton() 
        return true;
    

    private Object doInvoke(Object proxy, Method method, Object[] args) throws Throwable 
        // TODO:这里处理服务发现、负载均衡、网络通信等逻辑
    

就这样, 我们实现了客户端启动时的扫包、创建代理的过程,接下来要做的事情就只是填充代理的逻辑了。

服务注册与服务发现

在系列的第一篇文章中提到,我们的 RPC 框架需要有一个服务注册中心。 通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息(例如ip、端口、版本信息等)。这一块有个统一的名称,叫服务发现。

对于服务发现,现在有很多可供选择的工具,例如 zookeeper, etcd 或者是 consul 等。 有一篇文章专门对这三个工具做了对比: 服务发现:Zookeeper vs etcd vs Consul。 在我的框架中, 我选择使用 Consul 来实现服务发现。对于 Consul 不了解的朋友可以去看我之前写的关于 Consul 的博客。

Consul 客户端也有一些 Java 的实现,我用到了 consul-api。

服务注册

首先,我们定义一个接口:

public interface ServiceRegistry 
    void register(String serviceName, ServiceAddress serviceAddress);

这个接口很简单,向服务注册中心注册自己的地址。

对应的 consul 的实现:

public class ConsulServiceRegistry implements ServiceRegistry 

    private ConsulClient consulClient;

    public ConsulServiceRegistry(String consulAddress) 
        String address[] = consulAddress.split(":");
        ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1]));
        consulClient = new ConsulClient(rawClient);
    

    @Override
    public void register(String serviceName, ServiceAddress serviceAddress) 
        NewService newService = new NewService();
        newService.setId(generateNewIdForService(serviceName, serviceAddress));
        newService.setName(serviceName);
        newService.setTags(new ArrayList<>());
        newService.setAddress(serviceAddress.getIp());
        newService.setPort(serviceAddress.getPort());

        // Set health check
        NewService.Check check = new NewService.Check();
        check.setTcp(serviceAddress.toString());
        check.setInterval("1s");
        newService.setCheck(check);

        consulClient.agentServiceRegister(newService);
    

    private String generateNewIdForService(String serviceName, ServiceAddress serviceAddress)
        // serviceName + ip + port
        return serviceName + "-" + serviceAddress.getIp() + "-" + serviceAddress.getPort();
    

这里我向 consul 注册服务的时候,还设定了健康状态检查方式为 TCP 连接方式, 即每过一秒,consul 都会尝试与该地址建立 TCP 连接以验证服务状态。 除了 TCP 连接之外,consul 还提供了 http、ttl 等多种检查方式。

另外一点值得注意的是,要确保 id 绝对唯一。 我能想到的比较直观的解决方案是 serviceName + 本机 ip + 本机 port 的组合。

服务发现

对于服务发现而言, 值得注意的是,我们需要去 watch consul 上值的变化, 并更新保存在应用中的服务的地址。

首先,我们定义一个接口:

public interface ServiceDiscovery 
    String discover(String serviceName);

这个接口很简单,传入 serviceName,获取一个可以访问的该 service 的地址。对应的 consul 的实现:

public class ConsulServiceDiscovery implements ServiceDiscovery 

    private ConsulClient consulClient;

    // 这里我用到了LoadBalancer, 关于LB这块,后续文章会专门讲述
    Map<String, LoadBalancer<ServiceAddress>> loadBalancerMap = new ConcurrentHashMap<>();

    public ConsulServiceDiscovery(String consulAddress) 
        String[] address = consulAddress.split(":");
        ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1]));
        consulClient = new ConsulClient(rawClient);
    

    @Override
    public String discover(String serviceName) 
        List<HealthService> healthServices;
        if (!loadBalancerMap.containsKey(serviceName)) 
            healthServices = consulClient.getHealthServices(serviceName, true, QueryParams.DEFAULT)
                    .getValue();
            loadBalancerMap.put(serviceName, buildLoadBalancer(healthServices));

            // Watch consul
            longPolling(serviceName);
        
        return loadBalancerMap.get(serviceName).next().toString();
    

    private void longPolling(String serviceName)
        new Thread(new Runnable() 
            @Override
            public void run() 
                long consulIndex = -1;
                do 

                    QueryParams param =
                            QueryParams.Builder.builder()
                                    .setIndex(consulIndex)
                                    .build();

                    Response<List<HealthService>> healthyServices =
                            consulClient.getHealthServices(serviceName, true, param);

                    consulIndex = healthyServices.getConsulIndex();
                    log.debug("consul index for  is: ", serviceName, consulIndex);

                    List<HealthService> healthServices = healthyServices.getValue()以上是关于如何写一个 RPC 框架的主要内容,如果未能解决你的问题,请参考以下文章

三百行代码完成一个简单的rpc框架

图文分析:如何利用Google的protobuf,来思考设计实现自己的RPC框架

深入浅出 RPC - 深入篇

深入浅出 RPC - 深入篇

深入浅出RPC——深入篇(转载)

带你手写基于 Spring 的可插拔式 RPC 框架介绍