Apollo配置中心源码分析
Posted 张小喵喵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apollo配置中心源码分析相关的知识,希望对你有一定的参考价值。
Apollo 配置中心源码分析
Apollo是携程开源的一款分布式配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。
Apollo配置发布和通知的过程
-
用户在配置中心对配置进行修改并发布
-
配置中心通知Apollo客户端有配置更新
-
Apollo客户端从配置中心拉取最新的配置、更新本地配置并通知到应用
从Apollo模块看配置发布流程
Apollo四个核心模块及其主要功能
-
ConfigService
- 提供配置获取接口
- 提供配置推送接口
- 服务于Apollo客户端
-
AdminService
- 提供配置管理接口
- 提供配置修改发布接口
- 服务于管理界面Portal
-
Client
- 为应用获取配置,支持实时更新
- 通过MetaServer获取ConfigService的服务列表
- 使用客户端软负载SLB方式调用ConfigService
-
Portal
- 配置管理界面
- 通过MetaServer获取AdminService的服务列表
- 使用客户端软负载SLB方式调用AdminService
先对整体流程进行一个梳理:
*
用户修改和发布配置是通过portal调用AdminService,把配置变更保存在数据库中。
*
客户端通过长轮询访问ConfigService实时监听配置变更。默认超时时间是90秒。如果在超时前有配置变更,就会立即返回给客户端。客户端获取变化的配置,根据进行实时更新。如果超时也没有数据变更,就放304.客户端重新发起新的请求。
*
配置服务ConfigService有一个定时任务,每秒去扫描数据库,查看是否有新变更的数据。如果有数据变更就通知客户端。
下面打算对Apollo在页面修改配置后如何通知到客户端过程的源码进行分析。
说明:
- Apollo版本为1.9.1.
- 测试用的应用appid=apollo-demo,namespace=default,env=DEV,cluster=default
主要分为一下几个部分
- 页面发布配置(新增,修改和删除)
- configService获取到新发布的配置信息
- configService通知客户端最新的配置变更
- 客户端的同步更新Spring容器中注入的@Value的值
- Apollo 如何实现让自己的配置优先级最高
一、 Apollo修改配置与发布配置
1.1页面修改配置
修改name 旧值:张三 新值:张三1
URL: http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/item
参数:
"id":1,"namespaceId":1,"key":"name","value":"张三1","lineNum":1,"dataChangeCreatedBy":"apollo","dataChangeLastModifiedBy":"apollo","dataChangeCreatedByDisplayName":"apollo","dataChangeLastModifiedByDisplayName":"apollo","dataChangeCreatedTime":"2022-02-26T12:26:12.000+0800","dataChangeLastModifiedTime":"2022-02-26T12:26:12.000+0800","tableViewOperType":"update","comment":"修改姓名"
根据上面的分析在页面修改配置是portal调用AdminService保存到数据库。所以我们到Apollo的portal模块去查找请求。Apollo使用的是restful的请求方式,它的请求格式都是/参数名1/参数值1/参数名2/参数值2/……。所以我们就去portal查询"/apps/appId/envs/env/clusters/clusterName/namespaces/namespaceName/item")
@PutMapping("/apps/appId/envs/env/clusters/clusterName/namespaces/namespaceName/item")
public void updateItem(@PathVariable String appId,
@PathVariable String env,
@PathVariable String clusterName,
@PathVariable String namespaceName,
@RequestBody ItemDTO item)
checkModel(isValidItem(item));
String username = userInfoHolder.getUser().getUserId();
item.setDataChangeLastModifiedBy(username);
configService.updateItem(appId, Env.valueOf(env), clusterName, namespaceName, item);
单个更新配置时portal通过configService.updateItem()保存数据中
public void updateItem(String appId, Env env,
String clusterName,
String namespace,
long itemId,
ItemDTO item)
restTemplate.put(env, "apps/appId/clusters/clusterName/namespaces/namespaceName/items/itemId",
item, appId, clusterName, namespace, itemId);
这里就是portal通过restTemplate调用AdminService保存配置到数据库。
AdminService 中代码如下
@PutMapping("/apps/appId/clusters/clusterName/namespaces/namespaceName/items/itemId")
public ItemDTO update(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName,
@PathVariable("itemId") long itemId,
@RequestBody ItemDTO itemDTO)
Item managedEntity = itemService.findOne(itemId);
if (managedEntity == null)
throw new NotFoundException("item not found for itemId " + itemId);
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
// In case someone constructs an attack scenario
if (namespace == null || namespace.getId() != managedEntity.getNamespaceId())
throw new BadRequestException("Invalid request, item and namespace do not match!");
Item entity = BeanUtils.transform(Item.class, itemDTO);
ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
Item beforeUpdateItem = BeanUtils.transform(Item.class, managedEntity);
//protect. only value,comment,lastModifiedBy can be modified
managedEntity.setValue(entity.getValue());
managedEntity.setComment(entity.getComment());
managedEntity.setDataChangeLastModifiedBy(entity.getDataChangeLastModifiedBy());
// 保存配置到 Item表中
entity = itemService.update(managedEntity);
builder.updateItem(beforeUpdateItem, entity);
itemDTO = BeanUtils.transform(ItemDTO.class, entity);
if (builder.hasContent())
Commit commit = new Commit();
commit.setAppId(appId);
commit.setClusterName(clusterName);
commit.setNamespaceName(namespaceName);
commit.setChangeSets(builder.build());
commit.setDataChangeCreatedBy(itemDTO.getDataChangeLastModifiedBy());
commit.setDataChangeLastModifiedBy(itemDTO.getDataChangeLastModifiedBy());
// 保存发布信息到 commit 表中
commitService.save(commit);
return itemDTO;
我们看下数据item表中的配置信息。里面记录namespaceid,key,value,comment(配置的备注信息),可以根据上面信息查询到配置信息。
commit表中的信息。
每次修改配置都会新插入一条记录。其中changSets记录了这次变更的类型和内容。
每个changeSets中会按照createItems,updateItems,deleteItems分别记录了新增,修改和删除的配置项。每个分类里面又会记录具体的新增,修改和删除的具体配置信息。
1.2 查询配置列表
url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces
列表分别显示了有两条配置修改了,但是没有发布。在上面标记了未发布的标签。这个是怎么判断的呢?
我们一起看下源码吧。根据上面的地址,我们去portal中查询 /apps/appId/envs/env/clusters/clusterName/namespaces
@GetMapping("/apps/appId/envs/env/clusters/clusterName/namespaces")
public List<NamespaceBO> findNamespaces(@PathVariable String appId,
@PathVariable String env,
@PathVariable String clusterName)
// 根据应用名,环境和集群查询配置列表,根据namespece返回配置列表。
List<NamespaceBO> namespaceBOs = namespaceService.findNamespaceBOs(
appId, Env.valueOf(env), clusterName);
for (NamespaceBO namespaceBO : namespaceBOs)
if (permissionValidator.shouldHideConfigToCurrentUser(
appId, env, namespaceBO.getBaseInfo().getNamespaceName()))
namespaceBO.hideItems();
return namespaceBOs;
NamespaceBO中的内容。里面包含基本信息,以及namespace内的配置列表。item中的isModified表示配置是否修改,但是没有发布。如果修改了,里面还会包含修改前后的值。
namespaceService.findNamespaceBOs()是查询该集群下所有namespaces和配置信息。现在看下namespaceService.findNamespaceBOs()的具体实现。
public List<NamespaceBO> findNamespaceBOs(String appId, Env env, String clusterName)
// 根据查询应用,环境和集群查询当前的namespaces列表,
// 查询的表 namespace jpa语句 namespaceRepository.findByAppIdAndClusterNameAndNamespaceName(appId, clusterName,namespaceName);
List<NamespaceDTO> namespaces = namespaceAPI.findNamespaceByCluster(appId, env, clusterName);
if (namespaces == null || namespaces.size() == 0)
throw new BadRequestException("namespaces not exist");
List<NamespaceBO> namespaceBOs = new LinkedList<>();
for (NamespaceDTO namespace : namespaces)
NamespaceBO namespaceBO;
try
//根据环境查询得到NamespaceBO
namespaceBO = transformNamespace2BO(env, namespace);
namespaceBOs.add(namespaceBO);
catch (Exception e)
logger.error("parse namespace error. app id:, env:, clusterName:, namespace:",
appId, env, clusterName, namespace.getNamespaceName(), e);
throw e;
return namespaceBOs;
transformNamespace2BO作用就是查询出namespace中哪些是修改的,哪些是删除的。看下面代码前的前置内容
-
apollo 对数据库的操作都是使用JPA,查询时@Where(clause = "isDeleted = 0") 默认排除了已删除的
-
-对涉及到的几张表的说明
release:每次发布生效的配置记录。里面的Configurations 是对当前生效的配置列表的JSON串。已删除的配置不会保存在里面。
item:保存配置的表。adminService中新增,修改和删除配置都是更新这张表。里面是配置的最新值,但是配置的状态可能是已发布的,也可能是已修改但未发布的。
commit:保存每次配置修改的记录,里面记录每次修改配置提交时的新增,修改和删除的配置列表。
"createItems":[],"updateItems":["oldItem":"namespaceId":1,"key":"age","value":"21","comment":"年龄修改","lineNum":2,"id":2,"isDeleted":false,"dataChangeCreatedBy":"apollo","dataChangeCreatedTime":"2022-02-26 12:26:23","dataChangeLastModifiedBy":"apollo","dataChangeLastModifiedTime":"2022-03-05 09:56:27","newItem":"namespaceId":1,"key":"age","value":"22","comment":"年龄修改2","lineNum":2,"id":2,"isDeleted":false,"dataChangeCreatedBy":"apollo","dataChangeCreatedTime":"2022-02-26 12:26:23","dataChangeLastModifiedBy":"apollo","dataChangeLastModifiedTime":"2022-03-05 21:35:48"],"deleteItems":[]
- 如何判断配置是否发布
如果在item表中存在值跟最新发布生效的配置值不一样,则可能是新增或者修改的值但是为发布
- 如何判断配置已删除
查询最后一次发布记录,获取最后一次发布配置的时间。然后查询commit表中在最后一次发布配置后,所有的commit记录。然后从里面取出所有的删除配置列表。就得到的删除但没有发布的配置列表
private NamespaceBO transformNamespace2BO(Env env, NamespaceDTO namespace)
NamespaceBO namespaceBO = new NamespaceBO();
namespaceBO.setBaseInfo(namespace);
String appId = namespace.getAppId();
String clusterName = namespace.getClusterName();
String namespaceName = namespace.getNamespaceName();
fillAppNamespaceProperties(namespaceBO);
List<ItemBO> itemBOs = new LinkedList<>();
namespaceBO.setItems(itemBOs);
//latest Release
ReleaseDTO latestRelease;
Map<String, String> releaseItems = new HashMap<>();
Map<String, ItemDTO> deletedItemDTOs = new HashMap<>();
// 查询最后一次发布记录,里面保存了最新发布的,已经生效的的所有配置信息,不包括只删除的配置,json串保存。而items中的是最新的值,但可能是已发布的页可能是未发布的配置。
// 查询的表 Release jpa语句 releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameAndIsAbandonedFalseOrderByIdDesc(appId,clusterName,namespaceName);
latestRelease = releaseService.loadLatestRelease(appId, env, clusterName, namespaceName);
if (latestRelease != null)
releaseItems = GSON.fromJson(latestRelease.getConfigurations(), GsonType.CONFIG);
//not Release config items 开始处理未发布的配置
// 查询namespace下未删除的配置列表。列表中的内容可能有未发布的配置
// 查询的表 Item List<Item> items = itemRepository.findByNamespaceIdOrderByLineNumAsc(namespaceId);
List<ItemDTO> items = itemService.findItems(appId, env, clusterName, namespaceName);
additionalUserInfoEnrichService
.enrichAdditionalUserInfo(items, BaseDtoUserInfoEnrichedAdapter::new);
int modifiedItemCnt = 0;
for (ItemDTO itemDTO : items)
// 判断内容是否更改,并设置修改前和修改后的值。通过对比最后一次发布记录中的值与当前最新的值是否一致,如果不一致说明是修改后没有发布。
ItemBO itemBO = transformItem2BO(itemDTO, releaseItems);
if (itemBO.isModified())
modifiedItemCnt++;
itemBOs.add(itemBO);
//deleted items 开始处理已删除的配置
// 调用adminService 获取最后一次发布后的已删除的配置列表
itemService.findDeletedItems(appId, env, clusterName, namespaceName).forEach(item ->
deletedItemDTOs.put(item.getKey(),item);
);
List<ItemBO> deletedItems = parseDeletedItems(items, releaseItems, deletedItemDTOs);
itemBOs.addAll(deletedItems);
modifiedItemCnt += deletedItems.size();
namespaceBO.setItemModifiedCnt(modifiedItemCnt);
return namespaceBO;
1.3 发布配置
url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/releases
参数:"releaseTitle":"20220305225621-release","releaseComment":"发布删除的111","isEmergencyPublish":false
public ReleaseDTO createRelease(@PathVariable String appId,
@PathVariable String env, @PathVariable String clusterName,
@PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model)
model.setAppId(appId);
model.setEnv(env);
model.setClusterName(clusterName);
model.setNamespaceName(namespaceName);
if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env)))
throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
// 插入release记录
ReleaseDTO createdRelease = releaseService.publish(model);
ConfigPublishEvent event = ConfigPublishEvent.instance();
event.withAppId(appId)
.withCluster(clusterName)
.withNamespace(namespaceName)
.withReleaseId(createdRelease.getId())
.setNormalPublishEvent(true)
.setEnv(Env.valueOf(env));
// 发出发布event
publisher.publishEvent(event);
return createdRelease;
releaseService.publish(model) 调用adminService 中的插入release记录。adminService代码如下:
@PostMapping("/apps/appId/clusters/clusterName/namespaces/namespaceName/releases")
public ReleaseDTO publish(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName,
@RequestParam("name") String releaseName,
@RequestParam(name = "comment", required = false) String releaseComment,
@RequestParam("operator") String operator,
@RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish)
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
if (namespace == null)
throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,
clusterName, namespaceName));
// 保存发布记录
Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);
//send release message 发送发布消息
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
String messageCluster;
if (parentNamespace != null)
messageCluster = parentNamespace.getClusterName();
else
messageCluster = clusterName;
// 实际发布信息
messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName),
Topics.APOLLO_RELEASE_TOPIC);
return BeanUtils.transform(ReleaseDTO.class, release);
保存发布记录 releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);
*
查询namespace下所有未删除的配置列表
*
先查询最新的发布记录,获取上次最新发布记录的id。
*
组装release信息,插入到数据库中
发送消息通知ConfigService有新配置发布
Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace。这里发布后需要通知ConfigService有新的配置发布。configService获取新发布的配置信息,推送给client。
二、配置发布后的实时推送设计
上图简要描述了配置发布的大致过程:
- 用户在Portal操作配置发布
- Portal调用Admin Service的接口操作发布
- Admin Service发布配置后,发送ReleaseMessage给各个Config Service
- Config Service收到ReleaseMessage后,通知对应的客户端
####### 2.1 发送ReleaseMessage的实现方式
Admin Service在配置发布后,需要通知所有的Config Service有配置发布,从而Config Service可以通知对应的客户端来拉取最新的配置。
从概念上来看,这是一个典型的消息使用场景,Admin Service作为producer发出消息,各个Config Service作为consumer消费消息。通过一个消息组件(Message Queue)就能很好的实现Admin Service和Config Service的解耦。
在实现上,考虑到Apollo的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。
实现方式如下:
-
Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender
-
Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
-
Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
-
NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端
示意图如下:
现在看下Apollo源码中的具体实现过程
2.1 configService 定时扫描 releaseMessage
在Java配置类 ConfigServiceAutoConfiguration 中 配置了注册ReleaseMessageScanner到spring容器中。
@Bean
public ReleaseMessageScanner releaseMessageScanner()
// ReleaseMessageScanner 构造方法中初始化一个定时执行的线程池
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
// releaseMessageScanner注册监听器列表,获取到信息发布的消息,会调用监听器列表
//0. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
//1. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
//2. handle server cache
releaseMessageScanner.addMessageListener(configService);
releaseMessageScanner.addMessageListener(configFileController);
//3. notify clients 通知客户端
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
看下ReleaseMessageScanner构造方法
public ReleaseMessageScanner()
listeners = Lists.newCopyOnWriteArrayList();
// 初始化一个定时每秒执行的线程池 executorService
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
.create("ReleaseMessageScanner", true));
missingReleaseMessages = Maps.newHashMap();
同时,我们注意到ReleaseMessageScanner实现了InitializingBean接口,通过afterPropertiesSet方法对spring进行扩展。
public class ReleaseMessageScanner implements InitializingBean
@Override
public void afterPropertiesSet() throws Exception
// 获取配置扫描的时间间隔
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
// 初始化属性maxIdScanned,从ReleaseMessage表中获取最大ID
maxIdScanned = loadLargestMessageId();
// 定时线程池执行定时扫描ReleaseMessage的任务
executorService.scheduleWithFixedDelay(() ->
try
// 扫描ReleaseMessage最新发布,如果有新的发布配置就通知监听器
scanMessages();
catch (Throwable ex)
logger.error("Scan and send message failed", ex);
, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
查看scanMessages()的内容
private void scanMessages()
boolean hasMoreMessages = true;
// 一致扫描,直到没有新的发布消息
while (hasMoreMessages && !Thread.currentThread().isInterrupted())
hasMoreMessages = scanAndSendMessages();
private boolean scanAndSendMessages()
//current batch is 500,每次查询比maxIdScanned大的前500条发布记录信息
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages))
return false;
// 通知消息监听器对发布的消息进行处理,这里主要是NotificationControllerV2通知客户端新发布的配置
fireMessageScanned(releaseMessages);
// 获取当前扫描到的最大ID
long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
// 更新当前的扫描到的最大ID
maxIdScanned = newMaxIdScanned;
// 是否继续循扫描
return messageScanned == 500;
private void fireMessageScanned(Iterable<ReleaseMessage> messages)
for (ReleaseMessage message : messages)
for (ReleaseMessageListener listener : listeners)
try
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
catch (Throwable ex)
logger.error("Failed to invoke message listener ", listener.getClass(), ex);
2.2 NotificationControllerV2通知客户端配置更新
实现方式如下:
-
客户端会发起一个Http请求到Config Service的
notifications/v2
接口,也就是NotificationControllerV2,参见RemoteConfigLongPollService -
NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult把请求挂起
-
如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
-
如果有该客户端关心的配置发布,NotificationControllerV2会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置。
public class NotificationControllerV2 implements ReleaseMessageListener
@Override
public void handleMessage(ReleaseMessage message, String channel)
String content = message.getMessage();
// 只处理发布消息的消息
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content))
return;
// 获取发布信息的namespace。比如content内容为 apollo-demo+default+application,namespace为application
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace))
logger.error("message format invalid - ", content);
return;
// deferredResults 就是客户端请求到ConfigService请求挂起的集合。相当于客户端的集合
if (!deferredResults.containsKey(content))
return;
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
// 创建配置变更的通知对象
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
configNotification.addMessage(content, message.getId());
//do async notification if too many clients
//通知所有请求过来的客户端有新的配置发布。如果客户端太多就执行异步操作
if (results.size() > bizConfig.releaseMessageNotificationBatch())
largeNotificationBatchExecutorService.submit(() ->
for (int i = 0; i < results.size(); i++)
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0)
try
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
catch (InterruptedException e)
//ignore
logger.debug("Async notify ", results.get(i));
results.get(i).setResult(configNotification);
);
return;
logger.debug("Notify clients for key ", results.size(), content);
for (DeferredResultWrapper result : results)
// result.setResult()后,客户端就获得了新发布的namespace信息,挂起的请求就立即返回了。
result.setResult(configNotification);
logger.debug("Notification completed");
通过上面的操作,在用户发布新的配置后,adminService就新增一个releaseMessage到数据库中。configService定时扫描获取新增的发布消息,调用NotificationControllerV2后通知所有的客户端哪个namespace有新的配置发布。这是客户端就获取到了有发布配置的namespace,可以请求configService拉取这个namespace的最新配置到本地。
三、客户端实时通知和定时拉取配置
上图简要描述了Apollo客户端的实现原理:
- 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过Http Long Polling实现)
- 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。
- 这是一个fallback机制,为了防止推送机制失效导致配置不更新
- 客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
- 定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property:
apollo.refreshInterval
来覆盖,单位为分钟。
- 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
- 客户端会把从服务端获取到的配置在本地文件系统缓存一份
- 在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置
- 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知
3.1 客户端初始化过程
Apollo是在PropertySourcesProcessor中实现了在spring初始化时获取配置和设置自动更新配置的以及实现将Apollo配置的优先级设置为最高的。BeanFactoryPostProcessor 是spring对bean实例化前的扩展接口,可以对beanDefine进行修改。
public class PropertySourcesProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
this.configUtil = ApolloInjector.getInstance(ConfigUtil.class);
// 初始化和拉取配置,并这是Apollo的配置是最先生效的。同时设置实时和定时从configService获取配置
initializePropertySources();
// 运行时自动更新配置
initializeAutoUpdatePropertiesFeature(beanFactory);
我们先看initializePropertySources方法。该方法就是获取应用中设置的所有namespace从Apollo获取配置信息,然后设置Apollo的配置的优先级。
Apollo是如何设置自己配置优先于别的配置文件呢?
Spring从3.1版本开始增加了ConfigurableEnvironment
和PropertySource
:
- ConfigurableEnvironment
- Spring的ApplicationContext会包含一个Environment(实现ConfigurableEnvironment接口)
- ConfigurableEnvironment自身包含了很多个PropertySource
- PropertySource
- 属性源
- 可以理解为很多个Key - Value的属性配置
在运行时的结构形如:
需要注意的是,PropertySource之间是有优先级顺序的,如果有一个Key在多个property source中都存在,那么在前面的property source优先。
所以对上图的例子:
- env.getProperty(“key1”) -> value1
- env.getProperty(“key2”) -> value2
- env.getProperty(“key3”) -> value4
在理解了上述原理后,Apollo和Spring/Spring Boot集成的手段就呼之欲出了:在应用启动阶段,Apollo从远端获取配置,然后组装成PropertySource并插入到第一个即可,如下图所示:
private void initializePropertySources()
if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME))
//already initialized
return;
CompositePropertySource composite;
if (configUtil.isPropertyNamesCacheEnabled())
composite = new CachedCompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);
else
composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);
//sort by order asc
ImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());
Iterator<Integer> iterator = orders.iterator();
while (iterator.hasNext())
int order = iterator.next();
// 按照配置的Apollo的各个nameSpace的生效顺序,获取namespace
for (String namespace : NAMESPACE_NAMES.get(order))
// 重点 从Apollo的configservice获取nameSpace的配置的信息
Config config = ConfigService.getConfig(namespace);
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
// clean up
NAMESPACE_NAMES.clear();
// add after the bootstrap property source or to the first
// 设置Apollo配置的优先级最高,在environment.getPropertySources()位置为0
if (environment.getPropertySources()
.contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME))
// ensure ApolloBootstrapPropertySources is still the first
ensureBootstrapPropertyPrecedence(environment);
environment.getPropertySources()
.addAfter(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME, composite);
else
environment.getPropertySources().addFirst(composite);
我们看下应用启动是Apollo初始化获取配置的过程 ConfigService.getConfig(namespace);
public static Config getConfig(String namespace)
// 入口 ,重点看 getConfig
return s_instance.getManager().getConfig(namespace);
看下com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig()方法
@Override
public Config getConfig(String namespace)
// 从内存中获取配置
Config config = m_configs.get(namespace);
// 双重检验
if (config == null)
synchronized (this)
config = m_configs.get(namespace);
if (config == null)
ConfigFactory factory = m_factoryManager.getFactory(namespace);
// 创建配置
config = factory.create(namespace);
// 设置nameSpace的配置
m_configs.put(namespace, config);
return config;
com.ctrip.framework.apollo.spi.DefaultConfigFactory#create
public Config create(String namespace)
// 获取namespace的格式,一般是properties
ConfigFileFormat format = determineFileFormat(namespace);
// YML和yaml文件
if (ConfigFileFormat.isPropertiesCompatible(format))
return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
//重点 createLocalConfigRepository(namespace) 开启定时任务定时从远端拉取进行同步和长轮询
return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
protected Config createRepositoryConfig(String namespace,
ConfigRepository configRepository)
return new DefaultConfig(namespace, configRepository);
看下Apollo创建配置过程 com.ctrip.framework.apollo.spi.DefaultConfigFactory#create
public Config create(String namespace)
// 获取namespace的格式,一般是properties
ConfigFileFormat format = determineFileFormat(namespace);
// YML和yaml文件
if (ConfigFileFormat.isPropertiesCompatible(format))
return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
// 重点 createLocalConfigRepository(namespace) 开启定时任务定时从远端拉取进行同步和和长轮询
return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
LocalFileConfigRepository createLocalConfigRepository(String namespace)
// createRemoteConfigRepository 客户端连接远端,本地配置与远端配置进行同步,启动定时拉起配置和发起对ConfigService请求获取实时的配置变更
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
我们看下RemoteConfigRepository的构造方法
public RemoteConfigRepository(String namespace)
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 从远端拉取和本地配置进行同步,超时时间为5秒
this.trySync();
// 定时周期拉取最新(主动取configService拉取配置到本地,每5秒拉取一次,通过调用trySync(),)
this.schedulePeriodicRefresh();
// 长轮询实时刷新(远端推送)
this.scheduleLongPollingRefresh();
3.2 拉取远端配置同步到本地
protected boolean trySync()
try
sync();
return true;
catch (Throwable ex)
logger.warn("Sync config failed, will retry. Repository , reason: ", this.getClass(), ExceptionUtil.getDetailMessage(ex));
return false;
@Override
protected synchronized void sync()
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try
// 获取本地内存当前的配置
ApolloConfig previous = m_configCache.get();
// 获取远端的当前配置
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current)
logger.debug("Remote Config refreshed!");
// 远端配置覆盖本地配置
m_configCache.set(current);
// 远端配置有更新时,通知本地配置变更。
this.fireRepositoryChange(m_namespace, this.getConfig());
catch (Throwable ex)
throw ex;
for (RepositoryChangeListener listener : m_listeners)
try
// 调用listener更新配置
listener.onRepositoryChange(namespace, newProperties);
catch (Throwable ex)
logger.error("Failed to invoke repository change listener ", listener.getClass(), ex);
远端配置同步后,如果有更新就会更新本地缓存的配置文件。实现就在 com.ctrip.framework.apollo.internals.LocalFileConfigRepository#onRepositoryChange
@Override
public void onRepositoryChange(String namespace, Properties newProperties)
if (newProperties.equals(m_fileProperties))
return;
Properties newFileProperties = propertiesFactory.getPropertiesInstance();
newFileProperties.putAll(newProperties);
// 更新本地缓存
updateFileProperties(newFileProperties, m_upstream.getSourceType());
this.fireRepositoryChange(namespace, newProperties);
private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType)
this.m_sourceType = sourceType;
// 判断本地配置列表跟远端配置列表是否一致。Hashtable比较里面key的值是否一致
if (newProperties.equals(m_fileProperties))
return;
// 否则远端的配置覆盖本地配置
this.m_fileProperties = newProperties;
// 写入到本地的配置文件中
persistLocalCacheFile(m_baseDir, m_namespace);
现在我们一起看下client如何拉取从configService拉取配置的。主要流程如下
**
获取远端configService地址列表,负载均衡选择一个去拉取配置
**
拼接url并请求configService。url结构如下:http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2。com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig
代码如下
private ApolloConfig loadApolloConfig()
// 本地拉取远端配置限流每5秒一次
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS))
//wait at most 5 seconds
try
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
// 获取远端的配置服务configServie地址列表
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
for (int i = 0; i < maxRetries; i++)
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
// 对configServie地址列表进行乱序,相当于随机选取一个服务进行调用。负载均衡
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
// 优先访问通知过客户端的服务,放在列表的第一位
if (m_longPollServiceDto.get() != null)
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
for (ServiceDTO configService : randomConfigServices)
if (onErrorSleepTime > 0)
logger.warn(
"Load config failed, will retry in . appId: , cluster: , namespaces: ",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
catch (InterruptedException e)
//ignore
// 组装url http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2 请求的是configService的 ConfigController 的/appId/clusterName/namespace:.+
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from ", url);
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret))
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
try
// 从远端获取配置信息,
HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
// 如果请求返回304,说明配置没变化,返回本地缓存的配置对象
if (response.getStatusCode() == 304)
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
// 否则返回实际返回的配置信息
ApolloConfig result = response.getBody();
logger.debug("Loaded config for : ", m_namespace, result);
return result;
catch (ApolloConfigStatusCodeException ex)
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404)
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
exception = statusCodeException;
if(ex.getStatusCode() == 404)
break retryLoopLabel;
catch (Throwable ex)
exception = ex;
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
接下来看下configService中如何获取配置的。主要逻辑就是configservice获取release表中最新的发布记录,得到最新发布记录的releaseKey,与客户端带来的clientSideReleaseKey比较是否一致。如果一致,说明没有配置变化,返回304.否则就把最新发布的配置返回。
@GetMapping(value = "/appId/clusterName/namespace:.+")
public ApolloConfig queryConfig(@PathVariable String appId,
@PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,
HttpServletRequest request, HttpServletResponse response) throws IOException
String originalNamespace = namespace;
//strip out .properties suffix
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
namespace = namespaceUtil.normalizeNamespace(appId, namespace);
if (Strings.isNullOrEmpty(clientIp))
clientIp = tryToGetClientIp(request);
ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
List<Release> releases = Lists.newLinkedList();
String appClusterNameLoaded = clusterName;
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId))
// 查找Release表中最新的发布记录
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);
if (currentAppRelease != null)
releases.add(currentAppRelease);
//we have cluster search process, so the cluster name might be overridden
appClusterNameLoaded = currentAppRelease.getClusterName();
//if namespace does not belong to this appId, should check if there is a public configuration
if (!namespaceBelongsToAppId(appId, namespace))
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);
if (Objects.nonNull(publicRelease))
releases.add(publicRelease);
if (releases.isEmpty())
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
auditReleases(appId, clusterName, dataCenter, clientIp, releases);
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
// 比较客户端请求带来的clientSideReleaseKey与服务端是否一致,如果一致说明没有新的配置发布,返回304
if (mergedReleaseKey.equals(clientSideReleaseKey))
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
3.3 客户端定时拉取同步远端配置
讲完了客户端同步远端配置,我们重新回到RemoteConfigRepository的构造方法。
public RemoteConfigRepository(String namespace)
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 从远端拉取和本地配置进行同步,超时时间为5秒
this.trySync();
// 定时周期拉取最新(主动取configService拉取配置到本地,每5秒拉取一次,通过调用trySync(),)
this.schedulePeriodicRefresh();
// 长轮询实时刷新(远端推送)
this.scheduleLongPollingRefresh();
接下来查看定时周期拉取配置,同步本地配置,避免长轮询失败导致本地与远端配置不一致。是一种兜底的操作。
代码很简单就是启动一个定时线程池,定时调用同步配置的trySync()方法。每5分钟定时拉取一次。
private void schedulePeriodicRefresh()
logger.debug("Schedule periodic refresh with interval: ",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable()
@Override
public void run()
logger.debug("refresh config for namespace: ", m_namespace);
trySync();
, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
3.3 客户端长轮询实时同步配置
private void scheduleLongPollingRefresh()
// submit 方法中进行长轮询
remoteConfigLongPollService.submit(m_namespace, this);
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository)
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get())
startLongPolling();
return added;
private void startLongPolling()
if (!m_longPollStarted.compareAndSet(false, true))
//already started
return;
try
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
m_longPollingService.submit(new Runnable()
@Override
public void run()
// 初始化延迟2秒
if (longPollingInitialDelayInMills > 0)
try
logger.debug("Long polling will start in ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
catch (InterruptedException e)
//ignore
// 定时任务长轮询查询是否存在新的变更
doLongPollingRefresh(appId, cluster, dataCenter, secret);
);
catch (Throwable ex)
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
logger.warn(ExceptionUtil.getDetailMessage(exception));
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret)
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted())
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS))
//wait at most 5 seconds
try
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try
// 如果是第一次长轮询
if (lastServiceDto == null)
// 获取远端的配置服务列表
List<ServiceDTO> configServices = getConfigServices();
// 随机取一个configServcie进行长轮询
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
// m_notifications 存储每个namespace对应的releaseMessageId
url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications);
// 长轮询url=http://192.168.100.2:8080/notifications/v2?cluster=default&appId=apollo-demo&ip=192.168.100.2¬ifications=["namespaceName":"application","notificationId":-1]
logger.debug("Long polling from ", url);
HttpRequest request = new HttpRequest(url);
//90 seconds, should be longer than server side\'s long polling timeout, which is now 60 seconds
// 客户端请求超时时间为90 秒,应该比服务器端的长轮询超时时间长,服务端现在是 60 秒
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret))
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpClient.doGet(request, m_responseType);
logger.debug("Long polling response: , url: ", response.getStatusCode(), url);
// 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
// 客户端会把从服务端获取到的配置在本地文件系统缓存一份
if (response.getStatusCode() == 200 && response.getBody() != null)
// response.getBody() ApolloConfigNotificationnamespaceName=\'application\', notificationId=971
// 根据namespace 和 notificationId 进行更新,已经更新的记录
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
// 重点 通知本地拉取配置
notify(lastServiceDto, response.getBody());
//try to load balance 如果配置没有变化,就设置lastServiceDto为空,下次随机获取一个服务地址列表
if (response.getStatusCode() == 304 && random.nextBoolean())
lastServiceDto = null;
m_longPollFailSchedulePolicyInSecond.success();
catch (Throwable ex)
lastServiceDto = null;
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in seconds. appId: , cluster: , namespaces: , long polling url: , reason: ",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
catch (InterruptedException ie)
//ignore
长轮询的状态码如果是200,会根据返回的结果然本地重新拉取配置。返回结果就是namespace和releaseMessageId。
我们看下configService长轮询返回结果的代码。主要就是把请求转换为deferredResultWrapper 存储起来,对应的key是releaseMessage中的message(比如 apollo-demo+default+application),value是所有请求到该configService的请求转换的deferredResultWrapper的集合,如果configService扫描到有新的releaseMessage,就会查找根据message查询出所有的client,设置deferredResultWrapper的结果,让请求提前结束。否则请求会一直等待直到超时返回。
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp)
List<ApolloConfigNotification> notifications = null;
try
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
catch (Throwable ex)
Tracer.logError(ex);
if (CollectionUtils.isEmpty(notifications))
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
if (CollectionUtils.isEmpty(filteredNotifications))
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
//这里是构建DeferredResult对象,一会存储起来。NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult 把请求挂起。
//如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet())
String normalizedNamespace = notificationEntry.getKey();
ApolloConfigNotification notification = notificationEntry.getValue();
namespaces.add(normalizedNamespace);
clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace))
deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
/**
* 1、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
* 在check之前设置deferredResult,避免更多的等待。如果在设置deferredResult之前进行check,
* 。下次在check和set deferredResult之间执行handleMessage方法时可能会收到通知。
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() ->
//以上是关于Apollo配置中心源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Apollo配置中心之apollo-configservice模块源码分析
Kitty中的动态线程池支持Nacos,Apollo多配置中心了