RPC ---- 基于ZooKeeper为注册中心实现的RPC

Posted TheWhc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC ---- 基于ZooKeeper为注册中心实现的RPC相关的知识,希望对你有一定的参考价值。

一、原理

一个能够动态注册和获取服务信息的地方,来统一管理服务名称和其对应的服务器列表信息,称之为服务配置中心。如图所示

image-20210612234403793
  • 服务提供在启动时,将其提供的服务名称、服务器地址注册到服务配置中心
  • 服务消费者通过服务配置中心来获得需要调用的服务的机器列表,通过相应的负载均衡算法,选取其中一台服务器进行调用
  • 当服务器宕机或者下线时,相应的机器需要能够动态地从服务配置中心里面移除,并通知相应地服务消费者

二、统一配置管理

主要把服务名以及服务相关的服务器IP地址注册到注册中心,在使用服务的时候,只需要根据服务名,就可以得到所有服务地址IP,然后根据一定的负载均衡策略来选择IP地址

image-20210615234836712

1、服务的注册

关于服务的注册,其实就是把服务和IP注册到ZooKeeper节点中。

  • 服务名用的是永久节点
  • 服务IP地址用的是临时节点(为后面对节点进行注册监听做铺垫)

(用端口号的不同区别不同的机器)

image-20210615235851021

CuratorUtils类提供createPersistentNode()createEphemeralNode()方法

// 创建服务名永久节点PERSISTENT
public static void createPersistentNode(CuratorFramework zkClient, String path) {
   try {
      // 永久节点已存在
      if (PERSISTENT_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
         logger.info("永久节点已经存在,永久节点是:[{}]", path);
      } else {
         // 永久节点不存在,则创建永久节点
         //eg: /MyRPC/com.whc.rpc.api.UserService
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
         logger.info("永久节点成功被创建,永久节点是:[{}]", path);
      }
      PERSISTENT_REGISTERED_PATH_SET.add(path);
   } catch (Exception e) {
      logger.error("创建永久节点失败[{}]", path);
   }
}

// 创建服务地址为临时节点EPHEMERAL
// 临时节点,当客户端与 Zookeeper 之间的连接或者 session 断掉时会被zk自动删除。开源 Dubbo 框架,使用的就是临时节点
// 优点: 当服务节点下线或者服务节点不可用,Zookeeper 会自动将节点地址信息从注册中心删除
public static void createEphemeralNode(CuratorFramework zkClient, String path) {
   try {
      // 临时节点已存在
      if (EPHEMERAL_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
         logger.info("临时节点已经存在,临时节点是:[{}]", path);
      } else {
         // 临时节点不存在,则创建临时节点
         //eg: /MyRPC/com.whc.rpc.api.UserService/127.0.0.1:9000
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
         logger.info("临时节点成功被创建,临时节点是:[{}]", path);
      }
      EPHEMERAL_REGISTERED_PATH_SET.add(path);
   } catch (Exception e) {
      logger.error("创建临时节点失败[{}]", path);
   }
}

2、服务的发现

服务的发现就是根据服务名来获取ZooKeeper节点中的IP地址

CuratorUtils类提供了getChildrenNodes()方法

// 获取一个节点下的孩子节点
public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
   if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
      return SERVICE_ADDRESS_MAP.get(rpcServiceName);
   }
   List<String> result = null;
   String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
   try {
      result = zkClient.getChildren().forPath(servicePath);
      SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
      // 动态发现服务节点的变化(监听),如果提供服务的服务端上下线,则重新更新服务器列表
      registerWatcher(rpcServiceName, zkClient);
   } catch (Exception e) {
      logger.error("获取节点下的孩子节点 [{}] 失败", servicePath);
   }
   return result;
}

3、测试代码

服务的注册测试

NettyTestServer:测试用Netty服务提供者

public static void main(String[] args){
   UserService userService = new UserServiceImpl();
   BlogService blogService = new BlogServiceImpl();
   // 服务端需要把自己的ip,端口给注册中心
   NettyServer server = new NettyServer("127.0.0.1", 9000, CommonSerializer.PROTOBUF_SERIALIZER);
   server.publishService(userService, UserService.class);
   server.publishService(blogService, BlogService.class);

   server.start();
}

NettyServer:Netty服务提供者

@Override
public <T> void publishService(T service, Class<T> serviceClass) {
   if(serializer == null) {
      logger.error("未设置序列化器");
      throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
   }
   // com.whc.test.UserServiceImpl,UserService.Class
   serviceProvider.addServiceProvider(service, serviceClass);
   // com.whc.test.UserService,127.0.0.1:9000
   serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
}

ServiceRegistry:服务注册接口

/**
 * 服务注册接口
 * 注册:保存服务和地址
 * @ClassName: ServiceRegistry
 * @Author: whc
 * @Date: 2021/06/09/22:29
 */
public interface ServiceRegistry {

   /**
    * 将一个服务注册进注册表
    * @param serviceName 服务名称
    * @param inetSocketAddress 提供服务的地址
    */
   void register(String serviceName, InetSocketAddress inetSocketAddress);
}

ZKServiceRegistryImpl:服务注册实现类

public class ZKServiceRegistryImpl implements ServiceRegistry {

   @Override
   public void register(String serviceName, InetSocketAddress inetSocketAddress) {
       // /MyRPC/com.whc.test.UserService
      String servicePersistentPath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
       // /MyRPC/com.whc.test.UserService/127.0.0.1:9000
      String serviceEphemeralPath = servicePersistentPath + inetSocketAddress;
      CuratorFramework zkClient = CuratorUtils.getZkClient();
      // 创建服务名永久节点, 服务地址为临时节点
      CuratorUtils.createPersistentNode(zkClient, servicePersistentPath);
      CuratorUtils.createEphemeralNode(zkClient, serviceEphemeralPath);
   }

}

服务的发现测试

NettyClient:发送RpcRequest

// 获取服务地址
InetSocketAddress inetSocketAddress = serviceDiscovery.serviceDiscovery(rpcRequest.getInterfaceName());

ServiceDiscovery:服务发现接口

/**
 * 服务发现接口
 * 查询: 根据服务名查找地址
 * @ClassName: ServiceDiscovery
 * @Author: whc
 * @Date: 2021/06/13/23:52
 */
public interface ServiceDiscovery {

   /**
    * 根据服务名称查找服务实体
    * @param serviceName 服务名称
    * @return 服务实体
    */
   InetSocketAddress serviceDiscovery(String serviceName);
}

ZKServiceDiscoveryImpl:服务发现实现类

/**
 * 服务发现实现类
 * @ClassName: ZKServiceDiscoveryImpl
 * @Author: whc
 * @Date: 2021/06/14/0:57
 */
public class ZKServiceDiscoveryImpl implements ServiceDiscovery {

   private static final Logger logger = LoggerFactory.getLogger(ZKServiceDiscoveryImpl.class);

   private final LoadBalancer loadBalancer;

   public ZKServiceDiscoveryImpl() {
      this(null);
   }

   public  ZKServiceDiscoveryImpl(LoadBalancer loadBalancer) {
      if(loadBalancer == null) {
         this.loadBalancer = new RandomLoadBalance();
      } else {
         this.loadBalancer = loadBalancer;
      }
   }

   @Override
   public InetSocketAddress serviceDiscovery(String serviceName) {
      CuratorFramework zkClient = CuratorUtils.getZkClient();
       // 获取服务地址列表
      List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceName);
      if (serviceUrlList == null || serviceUrlList.size() == 0) {
         throw new RpcException(RpcError.SERVICE_NOT_FOUND, serviceName);
      }

      // 负载均衡
      String targetServiceUrl = loadBalancer.balance(serviceUrlList);
      logger.info("通过负载均衡策略,获取到服务地址:[{}]", targetServiceUrl);
      String[] socketAddressArray = targetServiceUrl.split(":");
      String host = socketAddressArray[0];
      int port = Integer.parseInt(socketAddressArray[1]);
      return new InetSocketAddress(host, port);
   }
}

测试截图

开启服务9000端口,向ZooKeeper注册服务

image-20210616004440900

开启服务9001端口,向ZooKeeper注册服务

image-20210616004516342

开启服务9002端口,向ZooKeeper注册服务

image-20210616011720439

客户端向ZooKeeper获取服务地址

image-20210616011711753

三、负载均衡

常见的负载均衡策略:随机,轮询,最小连接数,一致性Hash

这里只实现了随机轮询方式的负载均衡

1、接口

负载均衡用一个接口抽象出来:

/**
 * 负载均衡接口
 * 给服务器地址列表,根据不同的负载均衡策略选择一个
 * @ClassName: LoadBalancer
 * @Author: whc
 * @Date: 2021/06/12/22:08
 */
public interface LoadBalancer {
   String balance(List<String> serviceAddresses);
}

负载均衡抽象类

public abstract class AbstractLoadBalance implements LoadBalancer {

   @Override
   public String balance(List<String> serviceAddresses) {
      if (serviceAddresses == null || serviceAddresses.size() == 0) {
         return null;
      }
      if (serviceAddresses.size() == 1) {
         return serviceAddresses.get(0);
      }
      return doSelect(serviceAddresses);
   }

   protected abstract String doSelect(List<String> serviceAddresses);
}

2、随机、轮询代码

  • 随机

    /**
     * 随机负载均衡
     * @ClassName: RandomLoadBalance
     * @Author: whc
     * @Date: 2021/06/12/22:11
     */
    public class RandomLoadBalance extends AbstractLoadBalance {
    
       @Override
       protected String doSelect(List<String> serviceAddresses) {
          return serviceAddresses.get(new Random().nextInt(serviceAddresses.size()));
       }
    
    }
    
  • 轮询

    public class RoundLoadBalance extends AbstractLoadBalance {
    
       private int index = 0;
    
       @Override
       protected String doSelect(List<String> serviceAddresses) {
          if(index >= serviceAddresses.size()) {
             index %= serviceAddresses.size();
          }
          return serviceAddresses.get(index++);
       }
    }
    

3、客户端服务发现代码

@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
   CuratorFramework zkClient = CuratorUtils.getZkClient();
   // 获取服务地址列表
   List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceName);
   if (serviceUrlList == null || serviceUrlList.size() == 0) {
      throw new RpcException(RpcError.SERVICE_NOT_FOUND, serviceName);
   }

   // 负载均衡
   String targetServiceUrl = loadBalancer.balance(serviceUrlList);
   logger.info("通过负载均衡策略,获取到服务地址:[{}]", targetServiceUrl);
   String[] socketAddressArray = targetServiceUrl.split(":");
   String host = socketAddressArray[0];
   int port = Integer.parseInt(socketAddressArray[1]);
   return new InetSocketAddress(host, port);
}

三、动态感知服务器状态

1、文字描述

在实际的生成环境中一般都是集群环境部署,同一个程序会部署在相同的几台服务器上,这时就可以通过负载均衡服务器去调度,但是我们并不能很快速的获知哪台服务器挂掉了,这时我们就可以使用ZooKeeper来解决这个问题。

image-20210616013238659

  • 感知上线

    当服务器启动的时候通过程序知道后会同时在zookeeper的service节点下创建一个新的短暂节点来存储当前服务器的信息。客户端通过对service节点的watch可以立马知道有新的服务器上线了

  • 感知下线

    当我们有个服务器下线后,对应的service下的短暂节点会被删除,此时watch service节点的客户端也能立马知道哪个服务器下线了,能够及时将访问列表中对应的服务器信息移除,从而实现及时感知服务器的变化。

2、代码部分实现

CuratorUtils提供:

  • createEphemeralNode()创建临时节点
  • registerWatcher()监听节点
// 创建服务地址为临时节点EPHEMERAL
// 临时节点,当客户端与 Zookeeper 之间的连接或者 session 断掉时会被zk自动删除。开源 Dubbo 框架,使用的就是临时节点
// 优点: 当服务节点下线或者服务节点不可用,Zookeeper 会自动将节点地址信息从注册中心删除
public static void createEphemeralNode(CuratorFramework zkClient, String path) {
   try {
      // 临时节点已存在
      if (EPHEMERAL_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
         logger.info("临时节点已经存在,临时节点是:[{}]", path);
      } else {
         // 临时节点不存在,则创建临时节点
         //eg: /MyRPC/com.whc.rpc.api.UserService/127.0.0.1:9000
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
         logger.info("临时节点成功被创建,临时节点是:[{}]", path);
      }
      EPHEMERAL_REGISTERED_PATH_SET.add(path);
   } catch (Exception e) {
      logger.error("创建临时节点失败[{}]", path);
   }
}
// 对节点进行注册监听, 用的是PathChildrenCache
private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
   String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
   // 1. 创建监听对象
   PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);

   // 2. 绑定监听器
   pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
      @Override
      public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
         // 重新获取节点的孩子节点, 即重新获取服务列表信息
         List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
         // 更新客户端本地服务缓存
         SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
         logger.info("服务地址列表:{}", SERVICE_ADDRESS_MAP.get(rpcServiceName));
      }
   });

   // 3. 开启
   pathChildrenCache.start();
}

3、测试截图

假设提供服务的9000端口对应的机器关闭服务,由于ZooKeeper创建的是临时节点,所以断开连接后,超过一定时间后,会关闭会话,临时节点会被删除,此时监听节点的监听器会收到删除事件的信息,于是让客户端重新获取服务地址信息,同时更新客户端本地缓存服务信息。

服务器下线

如图是关闭端口为9001的服务器

image-20210616011855494

image-20210616004539096

服务器上线

重启端口为9001的服务器

image-20210616011904559

image-20210616013546838

四、总结

ZooKeeper上所形成的节点树如图所示:

image-20210616011908434

  • 服务注册与发现 & 负载均衡

    • 服务提供者在启动时,将其提供的服务名称、
      服务器地址,以节点的形式注册到服务配置中心
    • 服务消费者通过服务配置中心来获得需要调用的服务名称节点下的机器列表节点。通过负载均衡算法,选取其中一台服务器进行调用。
  • 动态感知服务器状态