Ribbon原理解析
Posted 奋斗吧_攻城狮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ribbon原理解析相关的知识,希望对你有一定的参考价值。
一. 核心接口
ILoadBalancer
Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能,此接口会根据不同的负载均衡策略(IRule)选择合适的Server返回给使用者。其核心方法如下:
public interface ILoadBalancer
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
此接口默认实现类为ZoneAwareLoadBalancer,相关类关系图如下:
IRule
IRule是负载均衡策略的抽象,ILoadBalancer通过调用IRule的choose()方法返回Server,其核心方法如下:
public interface IRule
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
实现类有:
- BestAviableRule
跳过熔断的Server,在剩下的Server中选择并发请求最低的Server
ClientConfigEnabledRoundRobinRule、RoundRobinRule轮询 - RandomRule
随机选择 - RetryRule
可重试的策略,可以对其他策略进行重试,默认轮询重试 - WeightedResponseTimeRule
根据响应时间加权,响应时间越短权重越大 - AvailabilityFilteringRule
剔除因为连续链接、读失败或链接超过最大限制导致熔断的Server,在剩下读Server中进行轮询。
相关类图如下:
IPing
IPing用来检测Server是否可用,ILoadBalancer的实现类维护一个Timer每隔10s检测一次Server的可用状态,其核心方法有:
public interface IPing
public boolean isAlive(Server server);
其实现类有:
IClientConfig
IClientConfig主要定义了用于初始化各种客户端和负载均衡器的配置信息,器实现类为DefaultClientConfigImpl。
二. 负载均衡的逻辑实现
Server的选择
ILoadBalancer接口的主要实现类为BaseLoadBalancer和ZoneAwareLoadBalancer,ZoneAwareLoadBalancer为BaseLoadBalancer的子类并且其也重写了chooseServer方法,ZoneAwareLoadBalancer从其名称可以看出这个实现类是和Spring Cloud的分区有关的,当分区的数量为1(默认配置)时它直接调用父类BaseLoadBalancer的chooseServer()方法,源码如下:
@Override
public Server chooseServer(Object key)
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1)
// 调用父类BaseLoadBalancer的chooseServer()方法
return super.chooseServer(key);
// 略
类BaseLoadBalancer的chooseServer()方法直接调用IRule接口的choose()方法,源码如下:
public Server chooseServer(Object key)
if (counter == null)
counter = createCounter();
counter.increment();
if (rule == null)
return null;
else
try
return rule.choose(key);
catch (Exception e)
logger.warn("LoadBalancer []: Error choosing server for key ", name, key, e);
return null;
这里IRule的实现类为ZoneAvoidanceRule,choose()方法的实现在其父类PredicateBasedRule中,如下:
@Override
public Server choose(Object key)
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent())
return server.get();
else
return null;
从上面源码可以看出,其先调用ILoadBalancer的getAllServers()方法获取所有Server列表,getAllServers()方法的实现在BaseLoadBalancer类中,此类维护了一个List类型的属性allServerList,所有Server都缓存至此集合中。获取Server列表后调用chooseRoundRobinAfterFiltering()方法返回Server对象。chooseRoundRobinAfterFiltering()方法会根据loadBalancerKey筛选出候选的Server,然后通过轮询的负载均衡策略选出Server,相关源码如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey)
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0)
return Optional.absent();
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
private int incrementAndGetModulo(int modulo)
for (;;)
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
可以看到其轮询选择Server的策略为获取次数加1然后对Server数量取余得到。
Server的状态检测
BaseLoadBalancer类的集合allServerList缓存了所有Server信息,但是这些Server的状态有可能发生变化,比如Server不可用了,Ribbon就需要及时感知到,那么Ribbon是如何感知Server可用不可用的呢?
BaseLoadBalancer的构造函数中初始化了一个任务调度器Timer,这个调度器每隔10s执行一次PingTask任务,相关源码如下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy)
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
void setupPingTask()
if (canSkipPing())
return;
if (lbTimer != null)
lbTimer.cancel();
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
class PingTask extends TimerTask
public void run()
try
new Pinger(pingStrategy).runPinger();
catch (Exception e)
logger.error("LoadBalancer []: Error pinging", name, e);
深入Pinger和SerialPingStrategy的源码可知,最终通过NIWSDiscoveryPing这一IPing实现类判断Server是否可用,NIWSDiscoveryPing的isAlive()方法通过判断与Server关联的InstanceInfo的status是否为UP来判断Server是否可用,其isAlive()方法源码如下:
public boolean isAlive(Server server)
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer)
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null)
InstanceStatus status = instanceInfo.getStatus();
if (status!=null)
// 其状态是否为UP
isAlive = status.equals(InstanceStatus.UP);
return isAlive;
三、Ribbon的使用姿势
RestTemplate + @LoadBalanced
提供一个标记@LoadBalanced的RestTemplate Bean,然后直接使用此Bean发起请求即可,如下:
@Configuration
public class Config
@Bean
@LoadBalanced
RestTemplate restTemplate()
// 提供一个标记@LoadBalanced的RestTemplat Bean
return new RestTemplate();
@RestController
public class HelloController
@Resource
private RestTemplate restTemplate;
@GetMapping("/hi")
public String hi()
// 直接使用即可
return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody();
当实例化LoadBalancerAutoConfiguration时,给所有标记了@LoadBalanced的RestTemplate Bean设置了拦截器LoadBalancerInterceptor,此实例保存在了RestTemplate的父类InterceptingHttpAccessor的集合List interceptors中。RestTemplate相关类图如下:
设置拦截器LoadBalancerInterceptor源码如下:
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration
// 1. 收集到所有标记了@LoadBalanced的RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers)
return () -> restTemplateCustomizers.ifAvailable(customizers ->
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates)
for (RestTemplateCustomizer customizer : customizers)
// 3. 对于每一个RestTemplate执行customize()方法
customizer.customize(restTemplate);
);
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient)
return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory)
// 2. 注入LoadBalancerInterceptor
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor)
return restTemplate ->
// 4. customize()方法给RestTemplate设置LoadBalancerInterceptor
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
;
// 略
从上面源码可以看出LoadBalancerInterceptor的构造函数接受两个参数:LoadBalancerClient和LoadBalancerRequestFactory,LoadBalancerRequestFactory的实例在此Configuration中被注入类,而LoadBalancerClient的实例却没有。那么LoadBalancerClient的实例是在哪里实例化的呢?答案是RibbonAutoConfiguration,这个Configuration注入了LoadBalancerClient的实现类RibbonLoadBalancerClient的实例和SpringClientFactory的实例,相关源码如下:
@Bean
public SpringClientFactory springClientFactory()
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient()
return new RibbonLoadBalancerClient(springClientFactory());
至此拦截器LoadBalancerInterceptor创建完成并且保存在了RestTemplate的集合属性中,那么RestTemplate是如何利用此拦截器的呢?当我们使用RestTemplate发起请求时最终会调用到RestTemplate的doExecute()方法,此方法会创建ClientHttpRequest对象并调用其execute()方法发起请求,源码如下:
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException
ClientHttpResponse response = null;
try
// 1. 创建ClientHttpRequest。
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null)
requestCallback.doWithRequest(request);
// 2. 执行其execute()方法获取结果。
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
catch (IOException ex)
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \\"" + resource + "\\": " + ex.getMessage(), ex);
finally
if (response != null)
response.close();
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled())
logger.debug("HTTP " + method.name() + " " + url);
return request;
@Override
public ClientHttpRequestFactory getRequestFactory()
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors))
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null)
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
return factory;
else
return super.getRequestFactory();
从上面的getRequestFactory()方法可以看到当集合interceptors不为空的时候ClientHttpRequest对象是由类InterceptingClientHttpRequestFactory的createRequest()方法创建出来的,并且集合interceptors作为参数传递到了InterceptingClientHttpRequestFactory中,深入InterceptingClientHttpRequestFactory的createRequest()方法,如下:
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper
private final List<ClientHttpRequestInterceptor> interceptors;
public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
@Nullable List<ClientHttpRequestInterceptor> interceptors)
super(requestFactory);
this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory)
// 直接返回InterceptingClientHttpRequest对象。
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
可以看到拦截器最终传递到了InterceptingClientHttpRequest中,上面说了RestTemplate的doExecute()方法创建了InterceptingClientHttpRequest对象且调用了其execute()方法获取响应结果,深入其execute()方法发现在execute()中直接调用了拦截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法,源码如下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException
if (this.iterator.hasNext())
ClientHttpRequestInterceptor nextInterceptor = Ribbon原理解析