Spring Cloud2.1-Ribbon核心源码
Posted 意犹未尽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud2.1-Ribbon核心源码相关的知识,希望对你有一定的参考价值。
Ribbon负载均衡相关类
AbstractloadBalancer
ILoadBalancer的抽象实现类
public abstract class AbstractLoadBalancer implements ILoadBalancer { //服务实例分组枚举 //• ALL: 所有服务实例。 //• STATUS_UP: 正常服务的实例。 //• STATUS_NOT_UP: 停止服务的实例。 public enum ServerGroup{ ALL, STATUS_UP, STATUS_NOT_UP } //再根据负载均衡器选择服务实例时忽略key public Server chooseServer() { return chooseServer(null); } //定义了根据分组类型来获取 不同的服务实例的列表。 public abstract List<Server> getServerList(ServerGroup serverGroup); //对象被用来存储负载均衡器中各个 服务实例当前的属性和 统计信息。 这些信息非常有用, 我们可以利用这些信息来观察负载均衡器的运行情 //况, 同时这些信息也是用来制定负载均衡策略的重要依据。 public abstract LoadBalancerStats getLoadBalancerStats(); }
BaseloadBalancer
BaseLoadBalancer 类是和ribbon 负载均衡器的基础实现类,在该类中定义了很多关 于负载均衡器相关的基础内容。
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnectionListener, IClientConfigAware { private static Logger logger = LoggerFactory.getLogger(BaseLoadBalancer.class); private static final IRule DEFAULT_RULE = new RoundRobinRule(); private static final BaseLoadBalancer.SerialPingStrategy DEFAULT_PING_STRATEGY = new BaseLoadBalancer.SerialPingStrategy(); private static final String DEFAULT_NAME = "default"; private static final String PREFIX = "LoadBalancer_"; //BaseLoadBalancer服务选择是委托给IRule 这个接口表示负载均衡选择策略 protected IRule rule; //使用IPing检查服务是否有效的执行对象 内部使用线性轮训 默认实现类内部类 SerialPingStrategy protected IPingStrategy pingStrategy; //用于检查服务是否有效 默认为空需要注入 protected IPing ping; //维护所有服务 @Monitor(name = "LoadBalancer_AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList; //维护有效服务 @Monitor(name = "LoadBalancer_UpServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> upServerList; ...... public BaseLoadBalancer() { this.rule = DEFAULT_RULE;//默认使用new RoundRobinRule() this.pingStrategy = DEFAULT_PING_STRATEGY; this.ping = null; this.allServerList = Collections.synchronizedList(new ArrayList()); this.upServerList = Collections.synchronizedList(new ArrayList()); ..... this.ping = null; this.setRule(DEFAULT_RULE); this.setupPingTask(); } //将服务添加到服务清单 public void addServer(Server newServer) { if (newServer != null) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); newList.add(newServer); this.setServersList(newList); } catch (Exception var3) { logger.error("LoadBalancer [{}]: Error adding newServer {}", new Object[]{this.name, newServer.getHost(), var3}); } } } //将服务添加到服务清单 public void addServers(List<Server> newServers) { if (newServers != null && newServers.size() > 0) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); newList.addAll(newServers); this.setServersList(newList); } catch (Exception var3) { logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var3); } } } //将服务添加到服务清单 void addServers(Object[] newServers) { if (newServers != null && newServers.length > 0) { try { ArrayList<Server> newList = new ArrayList(); newList.addAll(this.allServerList); Object[] var3 = newServers; int var4 = newServers.length; for (int var5 = 0; var5 < var4; ++var5) { Object server = var3[var5]; if (server != null) { if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { newList.add((Server) server); } } } this.setServersList(newList); } catch (Exception var7) { logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var7); } } } //获得所有服务 public List<Server> getAllServers() { return Collections.unmodifiableList(this.allServerList); } //根据组获得服务 public List<Server> getServerList(ServerGroup serverGroup) { switch (serverGroup) { case ALL: return this.allServerList; case STATUS_UP: return this.upServerList; case STATUS_NOT_UP: ArrayList<Server> notAvailableServers = new ArrayList(this.allServerList); ArrayList<Server> upServers = new ArrayList(this.upServerList); notAvailableServers.removeAll(upServers); return notAvailableServers; default: return new ArrayList(); } } //根据实例id获得服务 public Server chooseServer(Object key) { if (this.counter == null) { this.counter = this.createCounter(); } this.counter.increment(); if (this.rule == null) { return null; } else { try { return this.rule.choose(key); } catch (Exception var3) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3}); return null; } } } //标记服务无效 public void markServerDown(Server server) { if (server != null && server.isAlive()) { logger.error("LoadBalancer [{}]: markServerDown called on [{}]", this.name, server.getId()); server.setAlive(false); this.notifyServerStatusChangeListener(Collections.singleton(server)); } } } }
DynamicServerlistloadBalancer
继承BaseloadBalancer 是对BaseloadBalancer扩展 实现了再服务运行期间动态更新的能力 还增加通过过滤器 选择性的过滤一些服务的功能
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class); boolean isSecure = false; boolean useTunnel = false; // to keep track of modification of server lists protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false); //用于更新服务清单 由EurekaRibbonClientConfiguration.ribbonServerList 创建DiscoveryEnabledNIWSServerList volatile ServerList<T> serverListImpl; //从注册中心获取到服务后更新本地服务 protected volatile ServerListUpdater serverListUpdtater; //通过serverListImpl.getUpdatedListOfServers执行更新操作 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } }
ServerList<T>
public interface ServerList<T extends Server> { //于获取初始化的服务实例 清单, public List<T> getInitialListOfServers(); //用于获取更新的服务实例清单 public List<T> getUpdatedListOfServers(); }
DiscoveryEnabledNIWSServerList
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> { public List<DiscoveryEnabledServer> getInitialListOfServers() { return this.obtainServersViaDiscovery(); } public List<DiscoveryEnabledServer> getUpdatedListOfServers() { return this.obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList(); if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) { EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get(); if (this.vipAddresses != null) { //可以理解为逻辑服务名字 比如PROVIDER String[] var3 = this.vipAddresses.split(","); int var4 = var3.length; for(int var5 = 0; var5 < var4; ++var5) { String vipAddress = var3[var5]; //通过EurekaClient从注册中心拉取服务 List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion); Iterator var8 = listOfInstanceInfo.iterator(); while(var8.hasNext()) { InstanceInfo ii = (InstanceInfo)var8.next(); if (ii.getStatus().equals(InstanceStatus.UP)) { if (this.shouldUseOverridePort) { if (logger.isDebugEnabled()) { logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort); } InstanceInfo copy = new InstanceInfo(ii); if (this.isSecure) { ii = (new Builder(copy)).setSecurePort(this.overridePort).build(); } else { ii = (new Builder(copy)).setPort(this.overridePort).build(); } } DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr); serverList.add(des); } } if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) { break; } } } //最终返回拉取的服务 return serverList; } else { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList(); } } }
ServerListUpdater
public interface ServerListUpdater { public interface UpdateAction { void doUpdate(); } //启动服务更新器, 传入的UpdadataAction对象为更新操作的具体实现。 void start(UpdateAction updateAction); //停止服务更新器 void stop(); ////荻取最近的更新时间戳 String getLastUpdate(); //获取上一次更新到现在的时间间隔,单位为毫秒 long getDurationSinceLastUpdateMs(); ////荻取错过的更新周期数 int getNumberMissedCycles(); ////荻取核心线程数 int getCoreThreads(); }
PollingServerListUpdater动态更新服务的默认策略 采用定时任务
EurekaNotificationServerListUpdater 也是动态更新服务但是它利用 Eureka 的事件监听器来驱动服务列表的更新操作。
PollingServerListUpdater
public class PollingServerListUpdater implements ServerListUpdater { //是DynamicServerListlodBalancer的成员变量传入 public synchronized void start(final UpdateAction updateAction) { if (this.isActive.compareAndSet(false, true)) { Runnable wrapperRunnable = new Runnable() { public void run() { if (!PollingServerListUpdater.this.isActive.get()) { if (PollingServerListUpdater.this.scheduledFuture != null) { PollingServerListUpdater.this.scheduledFuture.cancel(true); } } else { try { //外部传入拉取服务 调用的DynamicServerListLoadBalancer.updateAction.doUpdate() 这个doupdate里面调用serverListImpl.getUpdatedListOfServers();拉取服务 updateAction.doUpdate(); PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis(); } catch (Exception var2) { PollingServerListUpdater.logger.warn("Failed one update cycle", var2); } } } }; //开启定时任务 this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS); } else { logger.info("Already active, no-op"); } } }
ServerListFilter
DynamicServerlistloadBalancer 最终拉取服务的方法 涉及到一个filter
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { //对应成员变量volatile ServerListFilter<T> filter; servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }
public interface ServerListFilter<T extends Server> { //主要用于实现对服务实例列表的过滤, 通过传入的 服务实例清单, 根据 一 些规则返回过滤后的服务实例清单 public List<T> getFilteredListOfServers(List<T> servers); }
AbstractServerListFilter
这是一 个抽象过滤器,在这里定义了过滤时需要 的一个重要依据对象 LoadBalancerStats, LoadBalancerStats保存了负载均衡器的属性和统计信息
public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> { //保存了负载均衡器的属性和统计信息 private volatile LoadBalancerStats stats; public AbstractServerListFilter() { } public void setLoadBalancerStats(LoadBalancerStats stats) { this.stats = stats; } public LoadBalancerStats getLoadBalancerStats() { return this.stats; } }
ZoneAffinityServerListFilter
该过滤器基于 “ 区域感知 (Zone Affinity)" 的方式实现服务实例的过滤, 也就是说, 它会根据提供服务的实例所处的区域 (Zone) 与消费者自身的所处区域 (Zone) 进行比较, 过滤掉那些不是同处 一 个区
域的实例。
public class ZoneAffinityServerListFilter<T extends Server> extends AbstractServerListFilter<T> implements IClientConfigAware { private volatile boolean zoneAffinity; private volatile boolean zoneExclusive; public List<T> getFilteredListOfServers(List<T> servers) { if (this.zone != null && (this.zoneAffinity || this.zoneExclusive) && servers != null && servers.size() > 0) { //Iterables.filter实现过滤 List<T> filteredServers = Lists.newArrayList(Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())); //判断是否要启用区域感知功能 调用下面shouldEnableZoneAffinity方法 if (this.shouldEnableZoneAffinity(filteredServers)) { return filteredServers; } if (this.zoneAffinity) { this.overrideCounter.increment(); } } return servers; } private boolean shouldEnableZoneAffinity(List<T> filtered) { if (!this.zoneAffinity && !this.zoneExclusive) { return false; } else if (this.zoneExclusive) { return true; } else { //保存了负载均衡器的相关统计信息 LoadBalancerStats stats = this.getLoadBalancerStats(); if (stats == null) { return this.zoneAffinity; } else { logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered); //取这些过滤后的同区域实例的基础指标(包含实例数量、断路器断开数、 活动请求数、 实例平均负载等), ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered); double loadPerServer = snapshot.getLoadPerServer(); int instanceCount = snapshot.getInstanceCount(); int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount(); /** * 当以下其中一个满足就不启用区域感知 * blackOutServerPercentage: 故障实例百分比(断路器断开数/实例数量) >=0.8。 * activeReqeustsPerServer: 实例平均负载 >=0.6 。 * availableServers: 可用实例数(实例数量 - 断路器断开数) <2。 */ if ((double)circuitBreakerTrippedCount / (double)instanceCount < this.blackOutServerPercentageThreshold.get() && loadPerServer < this.activeReqeustsPerServerThreshold.get() && instanceCount - circuitBreakerTrippedCount >= this.availableServersThreshold.get()) { return true; } else { logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[]{(double)circuitBreakerTrippedCount / (double)instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount}); return false; } } } } }
DefaultNIWSServerListFilter
完全继承ZoneAffinityServerListFilter 是默认的过滤器
public class DefaultNIWSServerListFilter<T extends Server> extends ZoneAffinityServerListFilter<T> { public DefaultNIWSServerListFilter() { } }
ServerListSubsetFilter
ZonePreferenceServerListFilter
/** * eureka和ribbon整合额默认过滤器 */ public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> { private String zone; public List<Server> getFilteredListOfServers(List<Server> servers) { //通过父类的区域感知获得所有服务列表 List<Server> output = super.getFilteredListOfServers(servers); //如果消费者配置了zone if (this.zone != null && output.size() == servers.size()) { List<Server> local = new ArrayList(); Iterator var4 = output.iterator(); //遍历服务列表剔除跟消费者配置zone不一致的服务 while(var4.hasNext()) { Server server = (Server)var4.next(); if (this.zone.equalsIgnoreCase(server.getZone())) { local.add(server); } } if (!local.isEmpty()) { return local; } } return output; } }
ZoneAwareLoadBalancer
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> { //重写了父类的方法 protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) { //父类会获获得所有服务 并根据zone进行分组 每个zone对应一个Zonestats super.setServerListForZones(zoneServersMap); //创建一个自己的ConcurrentHashMap 存储对应zone的负载均衡器 if (this.balancers == null) { this.balancers = new ConcurrentHashMap(); } //获得服务的迭代器 Iterator var2 = zoneServersMap.entrySet().iterator(); Entry existingLBEntry; while(var2.hasNext()) { existingLBEntry = (Entry)var2.next(); String zone = ((String)existingLBEntry.getKey()).toLowerCase(); //内部管理根据balancers 获得对应的负载均衡器(内部会创建对应的IRole规则 没有指定的话 默认Availability FilieringRule) 将对应的zone的服务放到对应的负载均衡器 this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue()); } //遍历负载均衡器 var2 = this.balancers.entrySet().iterator(); while(var2.hasNext()) { existingLBEntry = (Entry)var2.next(); //检查对应的zone下面是否没有服务了 如果是的话就清空 if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) { ((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList()); } } } //重写父类的chooseServer public Server chooseServer(Object key) { //当负载均衡器中的zone大于1的时候才执行自定义策略 否则还是用父类的 if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) { Server server = null; try { LoadBalancerStats lbStats = this.getLoadBalancerStats(); //为当前负载均衡器的所有zone创建快照 Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (this.triggeringLoad == null) { this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D); } if (this.triggeringBlackoutPercentage == null) { this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D); } /** * 对应zone的可用区的选择 * . 首先它会剔除符合 这些规则的 Zone区域: 所属实例数为零的 Zone 区域; Zone 区域内实例的平均负载小千零,或者实例故障率( 断路器断开次数/实例数)大 于 等于阙值(默认为0.99999)。 * . 然后根据Zone区域的实例平均负载计算出最差的Zone区域,这里的最差指的是 实例平均负载最高的Zone区域。 * . 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小千闾值 (默认为20%), 就直接返回所有Zone区域为可用区域。 否则,从最坏Zone区 * 域集合中随机选择 一 个,将它从可用Zone区域集合中 剔除。 */ Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); //当获取的zone区域不等于空 并且小于总数 if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { //随机选择一个 String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone); //选用具体的服务实例 内部使用ZoneAvoidanceRule选择 server = zoneLoadBalancer.chooseServer(key); }以上是关于Spring Cloud2.1-Ribbon核心源码的主要内容,如果未能解决你的问题,请参考以下文章