六年java老鸟细聊分布式配置中心,满满的干货指导

Posted 守夜人爱吃兔子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了六年java老鸟细聊分布式配置中心,满满的干货指导相关的知识,希望对你有一定的参考价值。

前言

系统开发过程中往往伴随着各种各样的配置,系统配置一旦设置就不再变更,业务配置随着业务发展变化需要经常变更。经常变更的配置如果写死在配置文件中,一来无法及时响应业务需求;二来会让系统失去灵活性,每次修改配置都需要重新发布应用。

基于存在的痛点问题,急需一种快速、方便修改配置的解决方案:apollo

对于apollo我们同样也存在着一些疑惑:

  • 应用程序通过什么方式来获取配置信息?
  • 配置信息发生变更后,应用程序又是如何及时进行更新的?

为了解决上面的疑惑,先从客户端看起

 

客户端

1. 主动拉取配置

应用程序客户端要想获取配置信息,可以采用主动方式去拉取

1.1 基于命名空间创建远程配置仓库

public RemoteConfigRepository(String namespace) {
    gson = new Gson();
    // 1.主动同步配置(客户端应用启动的时候执行)
    this.trySync();
    // 2.固定周期主动拉取配置(5分钟一次)
    this.schedulePeriodicRefresh();
    // 3.开启长轮询,询问服务器端是否有最新的配置,如果有最新的配置,客户端会主动去拉取最新配置;否则等待一段时间后继续询问
    this.scheduleLongPollingRefresh();
}

1.2 查询Config Service服务地址

private String assembleMetaServiceUrl() {
    // 1.获取metaServer地址(配置文件中指定)
    String domainName = m_configUtil.getMetaServerDomainName();
    // 2.应用唯一id
    String appId = m_configUtil.getAppId();
    // 3.本地ip
    String localIp = m_configUtil.getLocalIp();

    Map<String, String> queryParams = Maps.newHashMap();
    queryParams.put("appId", queryParamEscaper.escape(appId));
    if (!Strings.isNullOrEmpty(localIp)) {
        queryParams.put("ip", queryParamEscaper.escape(localIp));
    }
    // 4.请求地址
    return domainName + "/services/config?" + MAP_JOINER.join(queryParams);
}

由于meta serverconfig service部署在一起,那么通过meta server的地址 + /services/config路径就可以获取config service服务对应配置列表信息【config service可部署多台,会注册到eureka中,mata server又包装了eureka】

HttpResponse<List<ServiceDTO>> response = m_httpUtil.doGet(request, m_responseType);

得到服务配置列表后,以随机的方式主动拉取配置信息

private ApolloConfig loadApolloConfig() {
    // 1.config service服务配置列表
    List<ServiceDTO> configServices = getConfigServices();
    String url = null;
    retryLoopLabel:
    for (int i = 0; i < maxRetries; i++) {
        List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
        // 2.对服务配置列表进行洗牌
        Collections.shuffle(randomConfigServices);
        //Access the server which notifies the client first
        if (m_longPollServiceDto.get() != null) {
            randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
        }
	// 3.遍历服务配置列表
        for (ServiceDTO configService : randomConfigServices) {
            // 4.构建拉取配置url地址:configs/appId/cluster/namespace
            url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
                                         dataCenter, m_remoteMessages.get(), m_configCache.get());

            HttpRequest request = new HttpRequest(url);
            try {
		// 5.拉取配置
                HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
               	// 6.客户端配置与服务器端配置一直,服务器端会返回304
                if (response.getStatusCode() == 304) {
                    logger.debug("Config server responds with 304 HTTP status code.");
                    return m_configCache.get();
                }
		// 7.获取服务器端配置
                ApolloConfig result = response.getBody();
                return result;
            } catch (ApolloConfigStatusCodeException ex) {}
        }

    }
}

1.3 更新配置

@Override
protected synchronized void sync() {
    try {
        ApolloConfig previous = m_configCache.get();
        ApolloConfig current = loadApolloConfig();

	// 1.服务器端返回最新配置
        if (previous != current) {
	    // 2.更新客户端配置
            m_configCache.set(current);
            // 3.该部分逻辑暂时省略,后文会提到
            this.fireRepositoryChange(m_namespace, this.getConfig());
        }
    } catch (Throwable ex) {} 
}

小结

客户端拉取配置过程如下:

  • 查询config service服务配置信息
  • 根据config service服务配置信息,调用configs/appId/cluster/namespace地址,拉取配置信息
  • 服务器端返回最新配置,客户端覆盖旧配置信息

 

2. 长轮询

基于上文可得知客户端应用程序在启动的过程中会主动拉取一次配置信息,之后会每隔5分钟拉取一次,如果仅仅依靠主动拉取方式,会导致配置更新延迟;因此还需要服务端在配置发生变更时通知客户端去拉取配置,可以让配置及时更新。

上文1.1章节(基于命名空间创建远程配置仓库)创建RemoteConfigRepository时有提到长轮询
this.scheduleLongPollingRefresh();

private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
        // 1.避免频繁请求
        if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
            //wait at most 5 seconds
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
            }
        }
        String url = null;
        try {
            if (lastServiceDto == null) {
                // 2.获取config service服务配置列表
                List<ServiceDTO> configServices = getConfigServices();
                lastServiceDto = configServices.get(random.nextInt(configServices.size()));
            }
			
            // 3.请求http://10.100.40.103:8080/notifications/v2?cluster=default&appId=boot-example-apollo&ip=10.100.40.103¬ifications=%5B%7B%22namespaceName%22%3A%22application%22%2C%22notificationId%22%3A-1%7D%5D
            url =
                assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
                                           m_notifications);

            HttpRequest request = new HttpRequest(url);
            // 4.设置长轮询请求超时时间为90s
            request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
	    // 5.发起长轮询请求
            // 5.1如果客户端的配置不是最新配置,那么服务器端会立刻返回最新配置
	    // 5.2如果客户端的配置是最新配置,那么服务器端不会立马返回结果,请求会被挂起,要么等到请求超时,要么服务器端发现有最新配置后告诉客户端
            final HttpResponse<List<ApolloConfigNotification>> response =
                m_httpUtil.doGet(request, m_responseType);

            logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
            // 6.有最新配置
            if (response.getStatusCode() == 200 && response.getBody() != null) {
                updateNotifications(response.getBody());
                updateRemoteNotifications(response.getBody());
                // 6.1客户端得知服务器端有最新配置后会主动去拉取配置
                notify(lastServiceDto, response.getBody());
            }

            // 7.请求超时后服务器端仍为发现有最新配置,开启下一次请求
            if (response.getStatusCode() == 304 && random.nextBoolean()) {
                lastServiceDto = null;
            }
        } catch (Throwable ex) {} 
    }
}

小结

  • 客户端向服务器端发起长轮询
  • 客户端要么等待超时(超时后开启下一次请求),要么等待服务器告知有最新配置(有最新配置,客户端会主动去拉取配置,也就是上文1章节主动拉取配置内容)

3. 总结

客户端通过主动拉取和长轮询两种方式获取配置信息,长轮询的目的是当服务器端有最新配置可以及时通知到客户端,客户端会立刻去拉取最新配置,可以保证配置变更及时性。

 

服务端

1. 处理长轮询

public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification() {
    List<ApolloConfigNotification> notifications = null;

    try {
        notifications =
            gson.fromJson(notificationsAsString, notificationsTypeReference);
    } catch (Throwable ex) {

    }

    Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
    DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
    Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
    Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());


    // 1.每个key(appid + cluster + namespace)对应一个deferredResultWrapper
    for (String key : watchedKeys) {
        this.deferredResults.put(key, deferredResultWrapper);
    }

    /**
     * 2.查询是否有最新发布记录
     */
    List<ReleaseMessage> latestReleaseMessages =
        releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

    List<ApolloConfigNotification> newNotifications =
        getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
                                     latestReleaseMessages);
	// 3.有最新发布记录,立刻响应客户端
    if (!CollectionUtils.isEmpty(newNotifications)) {
        deferredResultWrapper.setResult(newNotifications);
    }
    
    // 4.将客户端请求挂起
    return deferredResultWrapper.getResult();
}

 

2. 扫描最新记录并响应挂起请求

ReleaseMessageScanner实现InitializingBean接口

@Override
public void afterPropertiesSet() throws Exception {
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    maxIdScanned = loadLargestMessageId();
    // 1.每1s扫描一次消息记录
    executorService.scheduleWithFixedDelay(() -> {
		scanMessages();
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
private boolean scanAndSendMessages() {
    //current batch is 500
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
        return false;
    }
    // 有新消息记录,响应客户端
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();
    maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    return messageScanned == 500;
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {

    String content = message.getMessage();
    Tracer.logEvent("Apollo.LongPoll.Messages", content);

    // 1.从boot-example-apollo+default+application内容中检索namespace
    String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
    
    // 2.从map取出DeferredResultWrapper
    List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

    ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
    configNotification.addMessage(content, message.getId());
    
    for (DeferredResultWrapper result : results) {
        // 3.响应之前挂起的客户端请求,这样客户端收到请求后就会主动拉取配置
        result.setResult(configNotification);
    }
    logger.debug("Notification completed");
}

3. 总结

服务器端针对客户端的长轮询请求要么立刻响应(客户端会主动拉取配置),要么将请求挂起(在之后的扫描过程中发现有最新记录,则响应挂起的客户端请求,客户端会主动拉取配置)

 

@Value属性更新

客户端主动拉取配置后会触发监听器事件

protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
    for (final ConfigChangeListener listener : m_listeners) {
        m_executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // 1.触发监听器的onChange()方法
                    listener.onChange(changeEvent);
                } catch (Throwable ex) {
                    
                } finally {}
            }
        });
    }
}

 

1. 执行监听器onChange()

@Override
public void onChange(ConfigChangeEvent changeEvent) {
    // 1.获取发生变更的key
    Set<String> keys = changeEvent.changedKeys();
    for (String key : keys) {
        // 2.根据key获取对应的SpringValue,具体可参考SpringValueProcessor会将配置key与SpringValue(含有配置key、配置默认值、field对应的bean、bean名称、filed)映射在一起
        Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
        if (targetValues == null || targetValues.isEmpty()) {
            continue;
        }

        // 2. 更新字段值
        for (SpringValue val : targetValues) {
            updateSpringValue(val);
        }
    }
}

 

最后

给大家分享一篇一线开发大牛整理的java高并发核心编程神仙文档,里面主要包含的知识点有:多线程、线程池、内置锁、JMM、CAS、JUC、高并发设计模式、Java异步回调、CompletableFuture类等。

领取文档地址GitHub标星12.5K+Java高并发核心编程知识笔记助我提升,感觉之前学的都是渣渣

感谢阅读,文章对你有帮助的话,不妨 一键三连 支持一下吧。你们的支持是我最大的动力,祝大家早日富可敌国

以上是关于六年java老鸟细聊分布式配置中心,满满的干货指导的主要内容,如果未能解决你的问题,请参考以下文章

八年 Java 老鸟,写给 1-3 年程序员的几点建议,满满硬货指导

10年 Java 老鸟,写给 1-3 年程序员的几点建议,满满硬货指导

如何一步步成为一名架构师?满满干货指导

10年测试老鸟,写给 1-3 年程序员的几点建议,满满硬货指导

极客时间资料下载,满满干货指导

八年 “自动化测试” 老鸟,写给 3-5 年测试员的几点建议,满满硬货指导