基于Netty手写RPC框架进阶版(下)——注册中心及服务的动态扩容

Posted 、楽.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Netty手写RPC框架进阶版(下)——注册中心及服务的动态扩容相关的知识,希望对你有一定的参考价值。

基于Netty手写RPC框架进阶版(上)——注解驱动 中,我们搭建好了注解驱动的RPC框架,接下来在本文中我们就来逐步实现注册中心以及动态扩容的功能。

1. 注册中心模块

接下来我们便来引入注册中心,这里使用zookeeper来实现,首先创建netyy-rpc-registry模块。

首先引入zk的相关依赖:

<dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
<!--        服务注册及发现-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.1.0</version>
        </dependency>
    </dependencies>

然后我们定义相关接口,在其中实现注册方法以及发现方法。

public interface IRegistryService 

    /**
     * 服务注册
     *
     * @param serviceInfo
     */
    void register(ServiceInfo serviceInfo);

    /**
     * 服务发现
     *
     * @param serviceName
     * @return
     */
    ServiceInfo discovery(String serviceName);


这里的话我们还需要定义一个实体类封装一些相关的服务信息。

@Data
public class ServiceInfo 

    private String serviceName;

    private String serviceAddress;

    private int servicePort;


接下来就是编写实现类了,这里我们可以自行选择使用zookeeper还是eureka,我这里使用zookeeper进行服务的注册。

整个实现类主要通过Curator客户端及ServiceInstance来实现服务的注册以及发现。这里的zookeeper相关代码在之前的文档里面由讲解过,可以自行查看。

public class ZookeeperRegistryService implements IRegistryService 

    private static final String REGISTRY_PATH = "/registry";

    // curator 中提供的服务注册与发现的封装
    private final ServiceDiscovery<ServiceInfo> serviceDiscovery;

    // 负载均衡
    private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;

    public ZookeeperRegistryService(String registryAddress) throws Exception 
        // 创建curator客户端
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(registryAddress, new ExponentialBackoffRetry(1000, 3));
        // 启动客户端
        client.start();
        // 实例化注册发现工具
        JsonInstanceSerializer<ServiceInfo> serializer = new JsonInstanceSerializer<>(ServiceInfo.class);
        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                .client(client)
                .serializer(serializer)
                .basePath(REGISTRY_PATH)
                .build();
        this.serviceDiscovery.start();
        // 实例化负载均衡算法 直接写死,因为这里只有一个
        this.loadBalance = new RandomLoadBalance();
    

    @Override
    public void register(ServiceInfo serviceInfo) throws Exception 
        System.out.println("begin registry serviceInfo to zookeeper server");
        //将服务端元数据保存到注册中心上
        ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder()
                .name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        //注册
        this.serviceDiscovery.registerService(serviceInstance);
    

    @Override
    public ServiceInfo discovery(String serviceName) throws Exception 
        System.out.println("begin discovery serviceInfo from zookeeper server");
        Collection<ServiceInstance<ServiceInfo>> serviceInstances = this.serviceDiscovery.queryForInstances(serviceName);
        //动态路由
        ServiceInstance<ServiceInfo> serviceInstance = this.loadBalance.select((List<ServiceInstance<ServiceInfo>>) serviceInstances);
        if (serviceInstance == null) 
            return null;
        
        return serviceInstance.getPayload();
    

在发现服务的代码中,我们需要实现动态路由,这里我们可以实现一个随机路由的算法。

首先创建负载均衡接口。

public interface ILoadBalance<T> 

    T select(List<T> servers);


然后创建一个抽象类作为模板方法:

public abstract class AbstractLoadBalance implements ILoadBalance<ServiceInstance<ServiceInfo>> 

    // 模板方法
    @Override
    public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers) 
        if (servers == null || servers.size() == 0) 
            return null;
        
        if (servers.size() == 1) 
            return servers.get(0);
        
        // 其他的由具体的子类实现
        return doSelect(servers);
    

    protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);

不同的路由算法由各个子类单独实现:

public class RandomLoadBalance extends AbstractLoadBalance 

    @Override
    protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) 
        int len = servers.size();
        Random random = new Random();
        return servers.get(random.nextInt(len));
    

服务注册这里处理完之后,我们来处理一些如何供外部调用。

首先创建类型枚举,让客户端可以选择不同的注册中心。

public enum RegistryType 

    ZOOKEEPER((byte) 0),
    EUREKA((byte) 1);

    private byte code;

    RegistryType(byte code) 
        this.code = code;
    

    public static RegistryType findByCode(int code) 
        for (RegistryType value : RegistryType.values()) 
            if (value.code == code) 
                return value;
            
        
        return null;
    


创建简易工厂类,根据不同的类型返回不同的注册服务。

public class RegistryFactory 

    public static IRegistryService createRegistryService(String address, RegistryType registryType) 
        IRegistryService service = null;

        try 
            switch (registryType) 
                case EUREKA:
                    // todo
                    break;
                case ZOOKEEPER:
                    service = new ZookeeperRegistryService(address);
                    break;
                default:
                    service = new ZookeeperRegistryService(address);
                    break;
            
         catch (Exception e) 
            e.printStackTrace();
        
        return service;
    


2. 服务注册

注册中心模块完成之后,我们就可以到框架模块将相关的服务注册上去了。

首先引入相关依赖:

<dependency>
    <groupId>org.example</groupId>
    <artifactId>netty-rpc-registry</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

然后修改SpringRpcProviderBean的相关代码:

首先引入注册中心,然后将相关服务注册上去即可。

public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor 

    private final int serverPort;

    private final String serverAddress;

    // 服务注册相关
    private final IRegistryService registryService; // 服务注册中心

    public SpringRpcProviderBean(int serverPort, IRegistryService registryService) throws UnknownHostException 
        this.serverPort = serverPort;
        // 获取服务器的IP地址
        InetAddress address = InetAddress.getLocalHost();
        System.out.println("网络地址:" + address.getHostAddress());
        this.serverAddress = address.getHostAddress();
        this.registryService = registryService;
    

    @Override
    public void afterPropertiesSet() throws Exception 
        System.out.println("begin deploy netty server");
        new Thread(() -> 
            new NettyServer(this.serverAddress, this.serverPort).startNettyServer();
        ).start();
    

    // 任何Bean装载到spring 容器的时候都会回调
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException 
        // 只要bean声明了RemoteService注解,则需要把该服务发布到网络上 允许被调用
        if (bean.getClass().isAnnotationPresent(RemoteService.class)) 
            // 得到所有方法
            Method[] methods = bean.getClass().getMethods();

            //我们这里需要把具体的发布映射关系保存下来
            for (Method method : methods) 
                String serviceName = bean.getClass().getInterfaces()[0].getName();
                String key = serviceName + "." + method.getName();
                BeanMethod beanMethod = new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key, beanMethod);

                //服务注册相关
                //发布到远程服务端
                ServiceInfo serviceInfo = new ServiceInfo();
                serviceInfo.setServiceAddress(this.serverAddress);
                serviceInfo.setServicePort(this.serverPort);
                serviceInfo.setServiceName(serviceName);
                try 
                    registryService.register(serviceInfo); // 注册服务
                 catch (Exception e) 
                    System.out.println(serviceName + "registry failed");
                    e.printStackTrace();
                
            
        
        return bean;
    

我们需要通过配置文件信息来控制注册中心的地址以及类型:

@Data
@ConfigurationProperties("rpc")
public class RpcServerProperties 

//    private String serviceAddress;

    private int servicePort;

    // 服务注册相关
    private String registryAddress;

    private byte registryType;


然后在注入SpringRpcProviderBean的时候引入对应的注册中心。

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration 

    @Bean
    public SpringRpcProviderBean springRpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException 
        // 服务注册的时候需要传递注册中心
        IRegistryService registryService = RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegistryType()));

        return new SpringRpcProviderBean(rpcServerProperties.getServicePort(), registryService);
    

最后在生产者端添加对应配置信息并启动项目查看。

server.port=8081

rpc.servicePort=20880

rpc.registryType=0

rpc.registryAddress=127.0.0.1:2181

打开我们的zookeeper客户端查看服务是否注册上去。

3. 消费端处理

既然我们的服务已经注册到注册中心了,那我们的消费端应该如何完善呢?还记得我们消费端Netty连接的地址和端口号都是我们在配置文件中写死的吗?如果是多个服务的话,是不是就不合理了,所以既然我们已经将服务放到注册中心了,那我们能不能直接连接到服务对应的地址上去呢?

没错,我们只需要在服务端根据名字找到对应的服务,然后将Netty客户端的地址设置成服务的地址和端口即可。

接下来逐步修改完善,首先从Netty客户端开始修改。

客户端这里我们不需要从构造方法中传入地址和端口号了,而是在发送消息的时候传入注册中心,然后注册中心根据类名获取到对应的服务,直接使用服务的地址和端口号进行连接。

public class NettyClient 

    private final Bootstrap bootstrap;

    private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

//    private String serviceAddress;
//
//    private int servicePort;

    //    public NettyClient(String serviceAddress, int servicePort) 
//        System.out.printf("begin init Netty Client,,", serviceAddress, servicePort);

    public NettyClient() 
        bootstrap = new Bootstrap();

        bootstrap.group(eventLoopGroup)
                .channel(NiosocketChannel.class)
                .handler(new RpcClientInitializer());

//        this.serviceAddress = serviceAddress;
//        this.servicePort = servicePort;
    

    // 发送数据包
    // 通过注册服务来连接对应地址
    public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception 
        // 服务发现
        ServiceInfo discovery = registryService.discovery(protocol.getContent().getClassName());
        // 地址都来自注册注册中心 ,不需要通过外部传递了
        final ChannelFuture future = bootstrap.connect(discovery.getServiceAddress(), discovery.getServicePort()).sync();
        //监听是否连接成功
        future.addListener(listener -> 
            if (future.isSuccess()) 
//                System.out.printf("connect rpc server  success.", this.serviceAddress);
             else 
//                System.out.printf("connect rpc server  failed. ", this.serviceAddress);
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();
            
        );
        System.out.println("begin transfer data");
        future.channel().writeAndFlush(protocol);
    

修改完客户端后,接下来就是对动态代理部分的代码进行修改,也是将地址和端口号改为服务的地址和端口。

首先是RpcInvokerProxy类,同样的这里不在传递地址和端口号,改为传递注册中心。

public class RpcInvokerProxy implements InvocationHandler 

    // 不需要手动传入了,直接从注册中心获取
    private IRegistryService registryService;

//    private String host;
//
//    private int port;
//
//    public RpcInvokerProxy(String host, int port) 
//        this.host = host;
//        this.port = port;
//    

    public RpcInvokerProxy(IRegistryService registryService) 
        this.registryService = registryService;
    

    @Override
    public Object invoke(Object proxy, Method method, Object[] argsGithub标星5.3K,进阶学习工作最全指南

Netty_06_手写RPC基础版(实践类)

Netty_06_手写RPC基础版(实践类)

Netty_06_手写RPC基础版(实践类)

手写dubbo 10-基于netty实现RPC

手写RPC框架-重写服务治理和引入netty,开启1000个线程看看执行调用情况