基于Netty手写RPC框架进阶版(下)——注册中心及服务的动态扩容
Posted 、楽.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Netty手写RPC框架进阶版(下)——注册中心及服务的动态扩容相关的知识,希望对你有一定的参考价值。
在 基于Netty手写RPC框架进阶版(上)——注解驱动 中,我们搭建好了注解驱动的RPC框架,接下来在本文中我们就来逐步实现注册中心以及动态扩容的功能。
1. 注册中心模块
接下来我们便来引入注册中心,这里使用zookeeper来实现,首先创建netyy-rpc-registry
模块。
首先引入zk的相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<!-- 服务注册及发现-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
然后我们定义相关接口,在其中实现注册方法以及发现方法。
public interface IRegistryService
/**
* 服务注册
*
* @param serviceInfo
*/
void register(ServiceInfo serviceInfo);
/**
* 服务发现
*
* @param serviceName
* @return
*/
ServiceInfo discovery(String serviceName);
这里的话我们还需要定义一个实体类封装一些相关的服务信息。
@Data
public class ServiceInfo
private String serviceName;
private String serviceAddress;
private int servicePort;
接下来就是编写实现类了,这里我们可以自行选择使用zookeeper还是eureka,我这里使用zookeeper进行服务的注册。
整个实现类主要通过Curator
客户端及ServiceInstance
来实现服务的注册以及发现。这里的zookeeper相关代码在之前的文档里面由讲解过,可以自行查看。
public class ZookeeperRegistryService implements IRegistryService
private static final String REGISTRY_PATH = "/registry";
// curator 中提供的服务注册与发现的封装
private final ServiceDiscovery<ServiceInfo> serviceDiscovery;
// 负载均衡
private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;
public ZookeeperRegistryService(String registryAddress) throws Exception
// 创建curator客户端
CuratorFramework client = CuratorFrameworkFactory
.newClient(registryAddress, new ExponentialBackoffRetry(1000, 3));
// 启动客户端
client.start();
// 实例化注册发现工具
JsonInstanceSerializer<ServiceInfo> serializer = new JsonInstanceSerializer<>(ServiceInfo.class);
this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(serializer)
.basePath(REGISTRY_PATH)
.build();
this.serviceDiscovery.start();
// 实例化负载均衡算法 直接写死,因为这里只有一个
this.loadBalance = new RandomLoadBalance();
@Override
public void register(ServiceInfo serviceInfo) throws Exception
System.out.println("begin registry serviceInfo to zookeeper server");
//将服务端元数据保存到注册中心上
ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getServiceName())
.address(serviceInfo.getServiceAddress())
.port(serviceInfo.getServicePort())
.payload(serviceInfo)
.build();
//注册
this.serviceDiscovery.registerService(serviceInstance);
@Override
public ServiceInfo discovery(String serviceName) throws Exception
System.out.println("begin discovery serviceInfo from zookeeper server");
Collection<ServiceInstance<ServiceInfo>> serviceInstances = this.serviceDiscovery.queryForInstances(serviceName);
//动态路由
ServiceInstance<ServiceInfo> serviceInstance = this.loadBalance.select((List<ServiceInstance<ServiceInfo>>) serviceInstances);
if (serviceInstance == null)
return null;
return serviceInstance.getPayload();
在发现服务的代码中,我们需要实现动态路由,这里我们可以实现一个随机路由的算法。
首先创建负载均衡接口。
public interface ILoadBalance<T>
T select(List<T> servers);
然后创建一个抽象类作为模板方法:
public abstract class AbstractLoadBalance implements ILoadBalance<ServiceInstance<ServiceInfo>>
// 模板方法
@Override
public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers)
if (servers == null || servers.size() == 0)
return null;
if (servers.size() == 1)
return servers.get(0);
// 其他的由具体的子类实现
return doSelect(servers);
protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
不同的路由算法由各个子类单独实现:
public class RandomLoadBalance extends AbstractLoadBalance
@Override
protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers)
int len = servers.size();
Random random = new Random();
return servers.get(random.nextInt(len));
服务注册这里处理完之后,我们来处理一些如何供外部调用。
首先创建类型枚举,让客户端可以选择不同的注册中心。
public enum RegistryType
ZOOKEEPER((byte) 0),
EUREKA((byte) 1);
private byte code;
RegistryType(byte code)
this.code = code;
public static RegistryType findByCode(int code)
for (RegistryType value : RegistryType.values())
if (value.code == code)
return value;
return null;
创建简易工厂类,根据不同的类型返回不同的注册服务。
public class RegistryFactory
public static IRegistryService createRegistryService(String address, RegistryType registryType)
IRegistryService service = null;
try
switch (registryType)
case EUREKA:
// todo
break;
case ZOOKEEPER:
service = new ZookeeperRegistryService(address);
break;
default:
service = new ZookeeperRegistryService(address);
break;
catch (Exception e)
e.printStackTrace();
return service;
2. 服务注册
注册中心模块完成之后,我们就可以到框架模块将相关的服务注册上去了。
首先引入相关依赖:
<dependency>
<groupId>org.example</groupId>
<artifactId>netty-rpc-registry</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
然后修改SpringRpcProviderBean
的相关代码:
首先引入注册中心,然后将相关服务注册上去即可。
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor
private final int serverPort;
private final String serverAddress;
// 服务注册相关
private final IRegistryService registryService; // 服务注册中心
public SpringRpcProviderBean(int serverPort, IRegistryService registryService) throws UnknownHostException
this.serverPort = serverPort;
// 获取服务器的IP地址
InetAddress address = InetAddress.getLocalHost();
System.out.println("网络地址:" + address.getHostAddress());
this.serverAddress = address.getHostAddress();
this.registryService = registryService;
@Override
public void afterPropertiesSet() throws Exception
System.out.println("begin deploy netty server");
new Thread(() ->
new NettyServer(this.serverAddress, this.serverPort).startNettyServer();
).start();
// 任何Bean装载到spring 容器的时候都会回调
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException
// 只要bean声明了RemoteService注解,则需要把该服务发布到网络上 允许被调用
if (bean.getClass().isAnnotationPresent(RemoteService.class))
// 得到所有方法
Method[] methods = bean.getClass().getMethods();
//我们这里需要把具体的发布映射关系保存下来
for (Method method : methods)
String serviceName = bean.getClass().getInterfaces()[0].getName();
String key = serviceName + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.beanMethodMap.put(key, beanMethod);
//服务注册相关
//发布到远程服务端
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setServiceAddress(this.serverAddress);
serviceInfo.setServicePort(this.serverPort);
serviceInfo.setServiceName(serviceName);
try
registryService.register(serviceInfo); // 注册服务
catch (Exception e)
System.out.println(serviceName + "registry failed");
e.printStackTrace();
return bean;
我们需要通过配置文件信息来控制注册中心的地址以及类型:
@Data
@ConfigurationProperties("rpc")
public class RpcServerProperties
// private String serviceAddress;
private int servicePort;
// 服务注册相关
private String registryAddress;
private byte registryType;
然后在注入SpringRpcProviderBean
的时候引入对应的注册中心。
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration
@Bean
public SpringRpcProviderBean springRpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException
// 服务注册的时候需要传递注册中心
IRegistryService registryService = RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegistryType()));
return new SpringRpcProviderBean(rpcServerProperties.getServicePort(), registryService);
最后在生产者端添加对应配置信息并启动项目查看。
server.port=8081
rpc.servicePort=20880
rpc.registryType=0
rpc.registryAddress=127.0.0.1:2181
打开我们的zookeeper客户端查看服务是否注册上去。
3. 消费端处理
既然我们的服务已经注册到注册中心了,那我们的消费端应该如何完善呢?还记得我们消费端Netty连接的地址和端口号都是我们在配置文件中写死的吗?如果是多个服务的话,是不是就不合理了,所以既然我们已经将服务放到注册中心了,那我们能不能直接连接到服务对应的地址上去呢?
没错,我们只需要在服务端根据名字找到对应的服务,然后将Netty客户端的地址设置成服务的地址和端口即可。
接下来逐步修改完善,首先从Netty客户端开始修改。
客户端这里我们不需要从构造方法中传入地址和端口号了,而是在发送消息的时候传入注册中心,然后注册中心根据类名获取到对应的服务,直接使用服务的地址和端口号进行连接。
public class NettyClient
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// private String serviceAddress;
//
// private int servicePort;
// public NettyClient(String serviceAddress, int servicePort)
// System.out.printf("begin init Netty Client,,", serviceAddress, servicePort);
public NettyClient()
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NiosocketChannel.class)
.handler(new RpcClientInitializer());
// this.serviceAddress = serviceAddress;
// this.servicePort = servicePort;
// 发送数据包
// 通过注册服务来连接对应地址
public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception
// 服务发现
ServiceInfo discovery = registryService.discovery(protocol.getContent().getClassName());
// 地址都来自注册注册中心 ,不需要通过外部传递了
final ChannelFuture future = bootstrap.connect(discovery.getServiceAddress(), discovery.getServicePort()).sync();
//监听是否连接成功
future.addListener(listener ->
if (future.isSuccess())
// System.out.printf("connect rpc server success.", this.serviceAddress);
else
// System.out.printf("connect rpc server failed. ", this.serviceAddress);
future.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
);
System.out.println("begin transfer data");
future.channel().writeAndFlush(protocol);
修改完客户端后,接下来就是对动态代理部分的代码进行修改,也是将地址和端口号改为服务的地址和端口。
首先是RpcInvokerProxy
类,同样的这里不在传递地址和端口号,改为传递注册中心。
public class RpcInvokerProxy implements InvocationHandler
// 不需要手动传入了,直接从注册中心获取
private IRegistryService registryService;
// private String host;
//
// private int port;
//
// public RpcInvokerProxy(String host, int port)
// this.host = host;
// this.port = port;
//
public RpcInvokerProxy(IRegistryService registryService)
this.registryService = registryService;
@Override
public Object invoke(Object proxy, Method method, Object[] argsGithub标星5.3K,进阶学习工作最全指南