如何写一个 RPC 框架
Posted sp42a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何写一个 RPC 框架相关的知识,希望对你有一定的参考价值。
开始造轮子之旅, 本期轮子:RPC 框架。在后续一段时间里, 我会写一系列文章来讲述如何实现一个 RPC 框架(我已经实现了一个示例框架, 代码在我的 github上)。 这是系列第一篇文章, 主要从整体角度讲述了一个 RPC 框架组成结构与关注点。
RPC框架的关注点
首先,什么是 RPC?RPC的全称是 Remote Procedure Call,远程过程调用。RPC 框架有很多,比如 hsf、dubbo 等等。借助 RPC 框架,我们在写业务代码的时候可以不需要去考虑服务之间的通信等问题,在调用远程服务的时候就像调用本地的方法那么简单。
那么,要写一个 RPC 框架应该由哪些部分组成,关注哪些东西?
简化本地调用流程
既然我们要像调用本地方法那样调用远程服务, 那么就应该生成代理来隐藏调用远程服务的细节。 这些细节包括但不限于以下所列出的关注点。
服务发现与服务注册
如果我们想在 Service A 中调用 Service B,那么我们首先得知道 Service B 的地址。 所以,我们需要有一个服务注册中心,通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息。客户端也需要watch服务注册中心的目标服务的地址的变化。
网络通信
- 服务和服务之间的网络通信模型, NIO/IO 等等
- 客户端如何复用与服务端的连接, 而不是每次请求都重新创建一个新连接?
- 客户端收到返回后,如何知道是哪个请求的返回并且做出正确处理?
消息的序列化
服务间通信的消息通过什么方式进行序列化? hessian,XML、JSON、Protobuf、……, 甚至 Java 原生的序列化方式, 你总得选择一个。
负载均衡
客户端通过服务注册中心拿到一堆地址,该调哪个呢?最简单的方式,可以通过 RR、WRR 的方式去做 LB。如果做得更深入一些,可以从以下角度去优化:
- 根据服务实例的 metrics 做出动态调整, 比如响应时间等
- 利用一致性哈希, 提高本地缓存利用率
容灾
- 健康监测: 在某一个服务节点挂掉的时候, 如何在服务注册中心删去这个服务地址?
- 服务调用超时与重试: 在调用一个服务实例的时候,如果超时或者报错,怎么处理?
- 服务限流:如何限制最大并发数?这个又可以从客户端和服务端两个角度分析。
利用 Bean 容器和动态代理简化客户端代码
在本系列第一篇文章中,我们说到了 RPC 框架需要关注的第一个点,通过创建代理的方式来简化客户端代码。
如果不使用代理?
如果我们不用代理去帮我们操心那些服务寻址、网络通信的问题,我们的代码会怎样?我们每调用一次远端服务,就要在业务代码中重复一遍那些复杂的逻辑,这肯定是不能接受的!
目标代码
而我们的目标是写出简洁的代码,就像这样:
// 这个接口应该被单独打成一个jar包,同时被server和client所依赖
@RPCService(HelloService.class)
public interface HelloService
String hello(String name);
@Component
@Slf4j
public class AnotherService
@Autowired
HelloService helloService;
public void callHelloService()
//就像调用本地方法一样自如!
log.info("Result of callHelloService: ", helloService.hello("world"));
@EnableRPCClients(basePackages = "pw.hshen.hrpc")
public class HelloClient
public static void main(String[] args) throws Exception
ApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
AnotherService anotherService = context.getBean(AnotherService.class);
anotherService.callHelloService();
代码中的 AnotherService
可以简单调用远端的 HelloService
的方法,就像调用本地的 service 一样简单! 在这段代码中,HelloService 可以视作 server, 而 AnotherService 则是它的调用者,可以视作是 client。
实现思路
获取要被创建代理的接口
首先,我们要知道需要为哪些接口来创建代理。
我们需要为这种特殊的接口创建一个注解来标注,即 RPCService。然后我们就可以通过扫描某个包下面所有包含这个注解的interface来获取了。那么,怎么知道要扫描哪个包呢?方法就是获取 MainClass 的 EnableRPCClients 注解的 basePackages 的值。
为这些接口创建动态代理
我们可以利用 jdk 的动态代理来做这件事儿:
// Interface是需要被创建代理的那个接口
Proxy.newProxyInstance(
interface.getClassLoader(),
new Class<?>[]interface,
new InvocationHandler()
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
// TODO: Do RPC action here and return the result
将创建出来的代理对象注册到 bean 容器中
关于如何动态向 spring 容器中注册自定义的 bean, 可以参考这篇文章。在我的框架中, 我选择了使用 BeanDefinitionRegistryPostProcessor 所提供的 hook。
注入到 bean 容器之后,我们就可以在代码中愉快的用 Autowired 等注解来获取所创建的代理啦!
完整代码
定义需要的注解
/**
* @author hongbin
* Created on 22/10/2017
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableRPCClients
String[] basePackages() default ;
/**
* @author hongbin
* Created on 21/10/2017
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
@Inherited
public @interface RPCService
Class<?> value();
利用 Spring 的 hook 机制, 向容器中注册我们自己的 proxy bean:
/**
* Register proxy bean for required client in bean container.
* 1. Get interfaces with annotation RPCService
* 2. Create proxy bean for the interfaces and register them
*
* @author hongbin
* Created on 21/10/2017
*/
@Slf4j
@RequiredArgsConstructor
public class ServiceProxyProvider extends PropertySourcesPlaceholderConfigurer implements BeanDefinitionRegistryPostProcessor
@NonNull
private ServiceDiscovery serviceDiscovery;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException
log.info("register beans");
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.addIncludeFilter(new AnnotationTypeFilter(RPCService.class));
for (String basePackage: getBasePackages())
Set<BeanDefinition> candidateComponents = scanner
.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents)
if (candidateComponent instanceof AnnotatedBeanDefinition)
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
BeanDefinitionHolder holder = createBeanDefinition(annotationMetadata);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
private ClassPathScanningCandidateComponentProvider getScanner()
return new ClassPathScanningCandidateComponentProvider(false)
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition)
if (beanDefinition.getMetadata().isIndependent())
if (beanDefinition.getMetadata().isInterface()
&& beanDefinition.getMetadata().getInterfaceNames().length == 1
&& Annotation.class.getName().equals(beanDefinition.getMetadata().getInterfaceNames()[0]))
try
Class<?> target = Class.forName(beanDefinition.getMetadata().getClassName());
return !target.isAnnotation();
catch (Exception ex)
log.error("Could not load target class: , ",
beanDefinition.getMetadata().getClassName(), ex);
return true;
return false;
;
private BeanDefinitionHolder createBeanDefinition(AnnotationMetadata annotationMetadata)
String className = annotationMetadata.getClassName();
log.info("Creating bean definition for class: ", className);
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(ProxyFactoryBean.class);
String beanName = StringUtils.uncapitalize(className.substring(className.lastIndexOf('.') + 1));
definition.addPropertyValue("type", className);
definition.addPropertyValue("serviceDiscovery", serviceDiscovery);
return new BeanDefinitionHolder(definition.getBeanDefinition(), beanName);
private Set<String> getBasePackages()
String[] basePackages = getMainClass().getAnnotation(EnableRPCClients.class).basePackages();
Set set = new HashSet<>();
Collections.addAll(set, basePackages);
return set;
private Class<?> getMainClass()
for (final Map.Entry<String, String> entry : System.getenv().entrySet())
if (entry.getKey().startsWith("JAVA_MAIN_CLASS"))
String mainClass = entry.getValue();
log.debug("Main class: ", mainClass);
try
return Class.forName(mainClass);
catch (ClassNotFoundException e)
throw new IllegalStateException("Cannot determine main class.");
throw new IllegalStateException("Cannot determine main class.");
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
对应的 ProxyBeanFactory:
/**
* FactoryBean for service proxy
*
* @author hongbin
* Created on 24/10/2017
*/
@Slf4j
@Data
public class ProxyFactoryBean implements FactoryBean<Object>
private Class<?> type;
private ServiceDiscovery serviceDiscovery;
@SuppressWarnings("unchecked")
@Override
public Object getObject() throws Exception
return Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]type, this::doInvoke);
@Override
public Class<?> getObjectType()
return this.type;
@Override
public boolean isSingleton()
return true;
private Object doInvoke(Object proxy, Method method, Object[] args) throws Throwable
// TODO:这里处理服务发现、负载均衡、网络通信等逻辑
就这样, 我们实现了客户端启动时的扫包、创建代理的过程,接下来要做的事情就只是填充代理的逻辑了。
服务注册与服务发现
在系列的第一篇文章中提到,我们的 RPC 框架需要有一个服务注册中心。 通过这个中心,服务可以把自己的信息注册进来,也可以获取到别的服务的信息(例如ip、端口、版本信息等)。这一块有个统一的名称,叫服务发现。
对于服务发现,现在有很多可供选择的工具,例如 zookeeper, etcd 或者是 consul 等。 有一篇文章专门对这三个工具做了对比: 服务发现:Zookeeper vs etcd vs Consul。 在我的框架中, 我选择使用 Consul 来实现服务发现。对于 Consul 不了解的朋友可以去看我之前写的关于 Consul 的博客。
Consul 客户端也有一些 Java 的实现,我用到了 consul-api。
服务注册
首先,我们定义一个接口:
public interface ServiceRegistry
void register(String serviceName, ServiceAddress serviceAddress);
这个接口很简单,向服务注册中心注册自己的地址。
对应的 consul 的实现:
public class ConsulServiceRegistry implements ServiceRegistry
private ConsulClient consulClient;
public ConsulServiceRegistry(String consulAddress)
String address[] = consulAddress.split(":");
ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1]));
consulClient = new ConsulClient(rawClient);
@Override
public void register(String serviceName, ServiceAddress serviceAddress)
NewService newService = new NewService();
newService.setId(generateNewIdForService(serviceName, serviceAddress));
newService.setName(serviceName);
newService.setTags(new ArrayList<>());
newService.setAddress(serviceAddress.getIp());
newService.setPort(serviceAddress.getPort());
// Set health check
NewService.Check check = new NewService.Check();
check.setTcp(serviceAddress.toString());
check.setInterval("1s");
newService.setCheck(check);
consulClient.agentServiceRegister(newService);
private String generateNewIdForService(String serviceName, ServiceAddress serviceAddress)
// serviceName + ip + port
return serviceName + "-" + serviceAddress.getIp() + "-" + serviceAddress.getPort();
这里我向 consul 注册服务的时候,还设定了健康状态检查方式为 TCP 连接方式, 即每过一秒,consul 都会尝试与该地址建立 TCP 连接以验证服务状态。 除了 TCP 连接之外,consul 还提供了 http、ttl 等多种检查方式。
另外一点值得注意的是,要确保 id 绝对唯一。 我能想到的比较直观的解决方案是 serviceName + 本机 ip + 本机 port 的组合。
服务发现
对于服务发现而言, 值得注意的是,我们需要去 watch consul 上值的变化, 并更新保存在应用中的服务的地址。
首先,我们定义一个接口:
public interface ServiceDiscovery
String discover(String serviceName);
这个接口很简单,传入 serviceName,获取一个可以访问的该 service 的地址。对应的 consul 的实现:
public class ConsulServiceDiscovery implements ServiceDiscovery
private ConsulClient consulClient;
// 这里我用到了LoadBalancer, 关于LB这块,后续文章会专门讲述
Map<String, LoadBalancer<ServiceAddress>> loadBalancerMap = new ConcurrentHashMap<>();
public ConsulServiceDiscovery(String consulAddress)
String[] address = consulAddress.split(":");
ConsulRawClient rawClient = new ConsulRawClient(address[0], Integer.valueOf(address[1]));
consulClient = new ConsulClient(rawClient);
@Override
public String discover(String serviceName)
List<HealthService> healthServices;
if (!loadBalancerMap.containsKey(serviceName))
healthServices = consulClient.getHealthServices(serviceName, true, QueryParams.DEFAULT)
.getValue();
loadBalancerMap.put(serviceName, buildLoadBalancer(healthServices));
// Watch consul
longPolling(serviceName);
return loadBalancerMap.get(serviceName).next().toString();
private void longPolling(String serviceName)
new Thread(new Runnable()
@Override
public void run()
long consulIndex = -1;
do
QueryParams param =
QueryParams.Builder.builder()
.setIndex(consulIndex)
.build();
Response<List<HealthService>> healthyServices =
consulClient.getHealthServices(serviceName, true, param);
consulIndex = healthyServices.getConsulIndex();
log.debug("consul index for is: ", serviceName, consulIndex);
List<HealthService> healthServices = healthyServices.getValue()以上是关于如何写一个 RPC 框架的主要内容,如果未能解决你的问题,请参考以下文章