手写RPC框架-重写服务治理和引入netty,开启1000个线程看看执行调用情况

Posted jwfy的学习分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC框架-重写服务治理和引入netty,开启1000个线程看看执行调用情况相关的知识,希望对你有一定的参考价值。

手写RPC框架

1、

2、

3、

原本打算写Netty替换BIO模型的,但是在写Netty中关于Channel长链接部分时,再回顾服务治理这一块的代码实在不太好(当然不意味现在的就很好),决定重写服务治理这一块的逻辑,最后验证1000个线程的执行效果,如下是涉及到改动的点:

  • 添加了logback日志模块,便于日志输出

  • 拆分了之前的服务治理,形成了服务发现和服务注册两个模块

  • 之前服务发现是细化到方法层面,现在修改成接口层面

  • 新增了zk节点监听模式,实现异步动态修改节点信息

  • netty的channel长链接模式

  • 负载均衡由之前的服务提供方ip的,改成netty的channel

在笔记中也会尽可能的阐述自己的设计思路,思路&实现都可能不那么完美,但这也是手写RPC的目的所在。当这个问题抛给你时,你是如何思考的,落实到具体代码实现,又有多少需要去妥协的。

看各种框架源码也是如此,不能为了看源码而看源码,看源码一方面能帮助我们解决实际的框架使用问题,另一方面是学习大佬的设计思路&好的架构经验,以能为我们所学习和使用。

增加logback模块

使用SLF4J标准规范,再关联绑定使用了logback,因为在 com.101tec#zkclient#0.11版本的zk中包含了log4j,为了避免冲突,所以要么移除,要么采用maven的最近原则,屏蔽zk中的日志模块,本文中则是采用了最近原则的做法,把logback放在pom文件的最顶部,如下图就是启动后的日志输出了(别忘记了往.gitignore中添加logs文件夹)

服务注册

服务注册之前是会注册服务提供方和服务调用方两者,以provider和consumer区分,但是这个新的改进中去掉了该部分逻辑,服务提供方只进行服务注册,服务调用方只进行服务发现,所以这也算一个妥协的点吧。

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

如上图是改写之后的zk节点信息,最下面的节点路径就是存在的ip数据,是临时节点,接下来看看服务注册这一块是如何实现的。

 
   
   
 
  1. public interface ServiceRegister {


  2. /**

  3. * 服务注册

  4. * @param config

  5. */

  6. void register(BasicConfig config);


  7. /**

  8. * 优雅关闭

  9. */

  10. void close();

  11. }

保留了之前的注册接口,以便于能做到协议扩展,同时加上了close关闭操作。

 
   
   
 
  1. public class ZkServiceRegister implements ServiceRegister {


  2. private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegister.class);

  3. private CuratorFramework client;

  4. private RegisterConfig registerConfig;


  5. public ZkServiceRegister(RegisterConfig registerConfig) {

  6. this.registerConfig = registerConfig;

  7. RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);


  8. this.client = CuratorFrameworkFactory

  9. .builder()

  10. .connectString(registerConfig.getZkHost())

  11. .sessionTimeoutMs(registerConfig.getSessionTimeOut())

  12. .retryPolicy(policy)

  13. .namespace(registerConfig.getZkNameSpace())

  14. .build();

  15. // 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里


  16. this.client.start();

  17. logger.info("zk启动正常");

  18. }


  19. @Override

  20. public void register(BasicConfig config) {

  21. String interfacePath = "/" + config.getInterfaceName();

  22. try {

  23. if (this.client.checkExists().forPath(interfacePath) == null) {

  24. // 创建 服务的永久节点

  25. this.client.create()

  26. .creatingParentsIfNeeded()

  27. .withMode(CreateMode.PERSISTENT)

  28. .forPath(interfacePath);

  29. }

  30. String address = getServiceAddress(config);

  31. String path = String.format("%s/%s/%s", interfacePath, ServiceType.PROVIDER.getType(), address);

  32. // 这里强制采用了ServiceType.PROVIDER.getType(),也就是provider


  33. logger.info("注册 zk path: [" + this.registerConfig.getZkNameSpace() + path + "]");

  34. this.client.create()

  35. .creatingParentsIfNeeded()

  36. .withMode(CreateMode.EPHEMERAL)

  37. .forPath(path, "0".getBytes());

  38. // 创建临时节点,节点内的数据是0

  39. } catch (Exception e) {

  40. // 如果节点session未过期,还未被清除,再次创建则会提示节点已存在

  41. logger.error("注册zk失败, [{}]:{}", interfacePath, e.getMessage());

  42. }

  43. }


  44. @Override

  45. public void close() {

  46. this.client.close();

  47. logger.warn("zkClient关闭");

  48. }


  49. private String getServiceAddress(BasicConfig config) {

  50. return new StringBuilder().append(config.getHost()).append(":").append(config.getPort()).toString();

  51. }

  52. }

相比上一个版本,代码精简了不少,只是针对服务提供方的接口进行了一个zk节点的注册,此外添加了close操作,以便于在服务停止时,主动关闭zk节点的链接。

服务发现

 
   
   
 
  1. public interface ServiceDiscovery {


  2. /**

  3. * 获取服务的ip信息并添加zk监听

  4. * @param interfaceName

  5. * @return

  6. */

  7. List<String> get(String interfaceName);

  8. }

原本这部分功能是在服务注册中的,现在移出来了,单独弄成一个接口ServiceDiscovery,同样是为了协议拓展的功能。

 
   
   
 
  1. public class ZkServiceDiscovery implements ServiceDiscovery {


  2. private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class);

  3. private CuratorFramework client;

  4. private Map<String, List<String>> servicePathMap = new ConcurrentHashMap<>();

  5. // 存储的是interface->ip信息的映射关系


  6. public ZkServiceDiscovery(RegisterConfig registerConfig) {

  7. RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);

  8. this.client = CuratorFrameworkFactory

  9. .builder()

  10. .connectString(registerConfig.getZkHost())

  11. .sessionTimeoutMs(registerConfig.getSessionTimeOut())

  12. .retryPolicy(policy)

  13. .namespace(registerConfig.getZkNameSpace())

  14. .build();


  15. this.client.start();

  16. logger.info("zk启动正常");

  17. }


  18. @Override

  19. public List<String> get(String interfaceName){

  20. String path = String.format("/%s/%s", interfaceName, ServiceType.PROVIDER.getType());

  21. List<String> ips = null;

  22. try {

  23. ips = this.client.getChildren().forPath(path);

  24. // 先获取子节点的所有信息

  25. this.addWatcher(interfaceName, path);

  26. // 添加监听模式

  27. servicePathMap.put(path, ips);

  28. } catch (Exception e) {

  29. }

  30. return ips;

  31. }


  32. private void addWatcher(String interfaceName, String path) throws Exception {

  33. PathChildrenCache cache = new PathChildrenCache(this.client, path, true);

  34. cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

  35. cache.getListenable().addListener(new PathChildrenCacheListener() {

  36. @Override

  37. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

  38. if (CHILD_ADDED == event.getType() || CHILD_REMOVED == event.getType()) {

  39. // 节点添加和节点移除操作

  40. String[] childPath = event.getData().getPath().split("/");

  41. String ip = childPath[childPath.length-1];

  42. PathChildrenCacheEvent.Type type = event.getType();

  43. logger.info("path:[{}],ip:[{}], type:{}", path, ip, type.toString());

  44. List<String> stringList = servicePathMap.get(path);

  45. if (stringList == null) {

  46. stringList = new ArrayList<>();

  47. servicePathMap.put(path, stringList);

  48. }


  49. if (CHILD_ADDED == type && !stringList.contains(ip)) {

  50. stringList.add(ip);

  51. ClientConnection.getInstance().connection(interfaceName, ip);

  52. // 这里有使用ClientConnection(服务连接器)去完成通知操作

  53. } else if (CHILD_REMOVED == type) {

  54. stringList.remove(ip);

  55. ClientConnection.getInstance().remove(interfaceName, ip);

  56. }

  57. }

  58. }

  59. });

  60. }

  61. }

通过zk获取对应服务的服务提供方信息的同时,添加了watcher模式以能实时感知zk节点的变化,然后把其变化信息告诉给服务调用方连接器

服务连接器

管理zk节点和netty可用channel的连接器,负载均衡也是在此处发挥作用,客户端可通过此连接器完成有效的Channel获取

 
   
   
 
  1. public class ClientConnection {


  2. private static final Logger logger = LoggerFactory.getLogger(ClientConnection.class);

  3. private SerializeProtocol serializeProtocol = new HessianSerialize();

  4. private Map<String, CopyOnWriteArrayList<IpClientHandler>> clientHandlerMap = new ConcurrentHashMap<>();

  5. private CopyOnWriteArrayList<ClientHandler> clientHandlerList = new CopyOnWriteArrayList<>();

  6. private volatile boolean flag = true;

  7. private ReentrantLock reentrantLock = new ReentrantLock();


  8. /**

  9. * 一个开关以控制对cliendhandler对获取

  10. */

  11. private Condition condition = reentrantLock.newCondition();


  12. private static class Single {

  13. private static final ClientConnection INSTANCE = new ClientConnection();

  14. }


  15. public static ClientConnection getInstance() {

  16. // 内部类的单例模式,线程安全

  17. return Single.INSTANCE;

  18. }


  19. public void connection(String interfaceName, String ip) {

  20. InetSocketAddress address = CommonUtils.parseIp(ip);

  21. EventLoopGroup work = new NioEventLoopGroup();

  22. // netty的客户端连接操作

  23. Bootstrap bootstrap = new Bootstrap();

  24. bootstrap.group(work).channel(NiosocketChannel.class)

  25. .handler(new ChannelInitializer<SocketChannel>() {

  26. @Override

  27. protected void initChannel(SocketChannel socketChannel) throws Exception {

  28. socketChannel.pipeline()

  29. .addLast(new LengthFieldPrepender(2,0,false))

  30. .addLast(new RpcEncoder(RpcRequest.class, serializeProtocol))

  31. .addLast(new RpcDecoder(RpcResponse.class, serializeProtocol))

  32. .addLast(new ClientHandler());

  33. }

  34. });

  35. ChannelFuture channelFuture = bootstrap.connect(address);

  36. channelFuture.addListener(new ChannelFutureListener() {

  37. @Override

  38. public void operationComplete(ChannelFuture channelFuture) throws Exception {

  39. if (channelFuture.isSuccess()) {

  40. ClientHandler handler = channelFuture.channel().pipeline().get(ClientHandler.class);

  41. InetSocketAddress remoteAddress = (InetSocketAddress) handler.getChannel().remoteAddress();

  42. logger.info("链接到远程服务器:{}, address:{}", handler, remoteAddress);

  43. CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);

  44. if (ips == null) {

  45. ips = new CopyOnWriteArrayList<IpClientHandler>();

  46. clientHandlerMap.put(interfaceName, ips);

  47. }

  48. ips.add(new IpClientHandler(ip, handler));

  49. clientHandlerList.add(handler);

  50. // 通知其他可能被阻塞的线程

  51. notifyHandler();

  52. }

  53. }

  54. });

  55. }


  56. public void remove(String interfaceName, String ip) {

  57. // 移除掉已经废弃的不可用的channel信息

  58. CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);

  59. if (ips != null && !ips.isEmpty()) {

  60. ips.stream().filter(x -> {

  61. return x.getIp().equals(ip);

  62. }).findFirst().ifPresent(x -> {

  63. ips.remove(x);

  64. clientHandlerList.remove(x.getHandler());

  65. });

  66. }

  67. }


  68. public ClientHandler getHandler(String interfaceName) {

  69. CopyOnWriteArrayList<IpClientHandler> ips = clientHandlerMap.get(interfaceName);

  70. int size = ips.size();

  71. while (flag && size <=0) {

  72. // 线程等待

  73. try {

  74. awaitHandler();

  75. } catch (InterruptedException e) {

  76. throw new RuntimeException("无法获取有效的handler");

  77. }

  78. ips = clientHandlerMap.get(interfaceName);

  79. size = ips.size();

  80. }

  81. if (!flag) {

  82. return null;

  83. }

  84. if (size == 1) {

  85. return ips.get(0).getHandler();

  86. }

  87. Random random = new Random();

  88. return ips.get(random.nextInt(size)).getHandler();

  89. }


  90. private void awaitHandler() throws InterruptedException {

  91. this.reentrantLock.lock();

  92. try {

  93. this.condition.await(2000, TimeUnit.MILLISECONDS);

  94. // 等待有有效的handler,不过也是有时间的,避免长时间等待

  95. } finally {

  96. this.reentrantLock.unlock();

  97. }

  98. }


  99. private void notifyHandler() {

  100. this.reentrantLock.lock();

  101. try {

  102. this.condition.signalAll();

  103. } finally {

  104. this.reentrantLock.unlock();

  105. }

  106. }


  107. public void close() {

  108. // 关闭,退出可能的条件判断、存储的channel进行主动关闭操作

  109. this.flag = true;

  110. this.clientHandlerList.forEach(x -> {

  111. x.getChannel().close();

  112. });

  113. this.clientHandlerList.clear();

  114. this.clientHandlerMap.clear();

  115. logger.warn("Netty服务端关闭了");

  116. }

  117. }

这短代码稍微比较长,而且也包含了netty客户端连接的逻辑(暂时可以不用关心netty连接的逻辑)。

那么为什么会在getHandler中添加条件判断呢?是因为netty连接有效的channel处理器是异步完成的,所以是在operationComplete异步回调方法中去通知的,确保可以获取到有效channel,而同时为了避免长时间的阻塞等待,故使用了await 2s的时间。

负载均衡也是比较简单的,如果确实没有数据则直接返回null,当有效的数据只有1个时,也没必要再做负载均衡,只有当数据超过1条时,需要进行选择操作。

实践

设置了两个服务提供方,一个服务调用方,按照先启动一个服务提供方、再启动服务调用方、最后启动另一个服务调用方的执行顺序

  • 服务提供方1启动手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

  • 服务调用方启动手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

  • 服务提供方2启动

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

  • 服务提供方1关闭

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

日志显示的也很明显,watcher监听到了节点的移除事件,然后进行关闭channel长连接的操作

  • 客户端退出

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

  • 负载均衡开启1000个线程

负载均衡

手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况服务调用方手写RPC框架(4)-重写服务治理和引入netty,开启1000个线程看看执行调用情况

服务提供方

负载均衡在两个channel中随机轮询,开启的1000个线程调用没什么问题,不过发现了个关于netty方面的问题,这期就不再介绍,后面介绍netty的时候再聊。

总结

本期针对服务治理的重写改善了之前每次获取远程ip都需要通过zookeeper获取的方式,降低对zookeeper带来压力,采用zookeeper的watcher模式感知到节点的变化,同时本地缓存的也不是ip节点数据,而是长连接channel数据,避免了每次获取ip数据后都需要进行连接和关闭操作,可以进一步的提高效率。最后新加了优雅关闭的操作。

之前BIO模式采用线程池的方式,1000个线程很快就会把线程池打满,导致后续的任务全部被拒绝,如果在拒绝任务没有处理好还会导致服务假死的情况,而采用netty,并没有出现线程池被打满的情况,采用react模式确实能够很好的处理网络连接的处理逻辑,并且封装了NIO部分的操作。限于篇幅的缘故,netty就还是留到后面再介绍。


以上是关于手写RPC框架-重写服务治理和引入netty,开启1000个线程看看执行调用情况的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点# 基于Netty,20分钟手写一个RPC框架

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

Netty_06_手写RPC基础版(实践类)

Netty_06_手写RPC基础版(实践类)

Netty_06_手写RPC基础版(实践类)

手写一个自己的 RPC 框架?