Ribbon原理解析
Posted 奋斗吧_攻城狮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ribbon原理解析相关的知识,希望对你有一定的参考价值。
一. 核心接口
ILoadBalancer
Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能,此接口会根据不同的负载均衡策略(IRule)选择合适的Server返回给使用者。其核心方法如下:
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
此接口默认实现类为ZoneAwareLoadBalancer,相关类关系图如下:
IRule
IRule是负载均衡策略的抽象,ILoadBalancer通过调用IRule的choose()方法返回Server,其核心方法如下:
public interface IRule{
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
实现类有:
- BestAviableRule
跳过熔断的Server,在剩下的Server中选择并发请求最低的Server
ClientConfigEnabledRoundRobinRule、RoundRobinRule轮询 - RandomRule
随机选择 - RetryRule
可重试的策略,可以对其他策略进行重试,默认轮询重试 - WeightedResponseTimeRule
根据响应时间加权,响应时间越短权重越大 - AvailabilityFilteringRule
剔除因为连续链接、读失败或链接超过最大限制导致熔断的Server,在剩下读Server中进行轮询。
相关类图如下:
IPing
IPing用来检测Server是否可用,ILoadBalancer的实现类维护一个Timer每隔10s检测一次Server的可用状态,其核心方法有:
public interface IPing {
public boolean isAlive(Server server);
}
其实现类有:
IClientConfig
IClientConfig主要定义了用于初始化各种客户端和负载均衡器的配置信息,器实现类为DefaultClientConfigImpl。
二. 负载均衡的逻辑实现
Server的选择
ILoadBalancer接口的主要实现类为BaseLoadBalancer和ZoneAwareLoadBalancer,ZoneAwareLoadBalancer为BaseLoadBalancer的子类并且其也重写了chooseServer方法,ZoneAwareLoadBalancer从其名称可以看出这个实现类是和Spring Cloud的分区有关的,当分区的数量为1(默认配置)时它直接调用父类BaseLoadBalancer的chooseServer()方法,源码如下:
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
// 调用父类BaseLoadBalancer的chooseServer()方法
return super.chooseServer(key);
}
// 略
}
类BaseLoadBalancer的chooseServer()方法直接调用IRule接口的choose()方法,源码如下:
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
这里IRule的实现类为ZoneAvoidanceRule,choose()方法的实现在其父类PredicateBasedRule中,如下:
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
从上面源码可以看出,其先调用ILoadBalancer的getAllServers()方法获取所有Server列表,getAllServers()方法的实现在BaseLoadBalancer类中,此类维护了一个List类型的属性allServerList,所有Server都缓存至此集合中。获取Server列表后调用chooseRoundRobinAfterFiltering()方法返回Server对象。chooseRoundRobinAfterFiltering()方法会根据loadBalancerKey筛选出候选的Server,然后通过轮询的负载均衡策略选出Server,相关源码如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
可以看到其轮询选择Server的策略为获取次数加1然后对Server数量取余得到。
Server的状态检测
BaseLoadBalancer类的集合allServerList缓存了所有Server信息,但是这些Server的状态有可能发生变化,比如Server不可用了,Ribbon就需要及时感知到,那么Ribbon是如何感知Server可用不可用的呢?
BaseLoadBalancer的构造函数中初始化了一个任务调度器Timer,这个调度器每隔10s执行一次PingTask任务,相关源码如下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
深入Pinger和SerialPingStrategy的源码可知,最终通过NIWSDiscoveryPing这一IPing实现类判断Server是否可用,NIWSDiscoveryPing的isAlive()方法通过判断与Server关联的InstanceInfo的status是否为UP来判断Server是否可用,其isAlive()方法源码如下:
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
// 其状态是否为UP
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
三、Ribbon的使用姿势
RestTemplate + @LoadBalanced
提供一个标记@LoadBalanced的RestTemplate Bean,然后直接使用此Bean发起请求即可,如下:
@Configuration
public class Config {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
// 提供一个标记@LoadBalanced的RestTemplat Bean
return new RestTemplate();
}
}
@RestController
public class HelloController {
@Resource
private RestTemplate restTemplate;
@GetMapping("/hi")
public String hi() {
// 直接使用即可
return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody();
}
}
当实例化LoadBalancerAutoConfiguration时,给所有标记了@LoadBalanced的RestTemplate Bean设置了拦截器LoadBalancerInterceptor,此实例保存在了RestTemplate的父类InterceptingHttpAccessor的集合List interceptors中。RestTemplate相关类图如下:
设置拦截器LoadBalancerInterceptor源码如下:
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
// 1. 收集到所有标记了@LoadBalanced的RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
// 3. 对于每一个RestTemplate执行customize()方法
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
}
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
// 2. 注入LoadBalancerInterceptor
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
// 4. customize()方法给RestTemplate设置LoadBalancerInterceptor
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
// 略
}
从上面源码可以看出LoadBalancerInterceptor的构造函数接受两个参数:LoadBalancerClient和LoadBalancerRequestFactory,LoadBalancerRequestFactory的实例在此Configuration中被注入类,而LoadBalancerClient的实例却没有。那么LoadBalancerClient的实例是在哪里实例化的呢?答案是RibbonAutoConfiguration,这个Configuration注入了LoadBalancerClient的实现类RibbonLoadBalancerClient的实例和SpringClientFactory的实例,相关源码如下:
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
至此拦截器LoadBalancerInterceptor创建完成并且保存在了RestTemplate的集合属性中,那么RestTemplate是如何利用此拦截器的呢?当我们使用RestTemplate发起请求时最终会调用到RestTemplate的doExecute()方法,此方法会创建ClientHttpRequest对象并调用其execute()方法发起请求,源码如下:
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
ClientHttpResponse response = null;
try {
// 1. 创建ClientHttpRequest。
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 2. 执行其execute()方法获取结果。
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \\"" + resource + "\\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
@Override
public ClientHttpRequestFactory getRequestFactory() {
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
从上面的getRequestFactory()方法可以看到当集合interceptors不为空的时候ClientHttpRequest对象是由类InterceptingClientHttpRequestFactory的createRequest()方法创建出来的,并且集合interceptors作为参数传递到了InterceptingClientHttpRequestFactory中,深入InterceptingClientHttpRequestFactory的createRequest()方法,如下:
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
private final List<ClientHttpRequestInterceptor> interceptors;
public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
@Nullable List<ClientHttpRequestInterceptor> interceptors) {
super(requestFactory);
this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
}
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
// 直接返回InterceptingClientHttpRequest对象。
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
}
可以看到拦截器最终传递到了InterceptingClientHttpRequest中,上面说了RestTemplate的doExecute()方法创建了InterceptingClientHttpRequest对象且调用了其execute()方法获取响应结果,深入其execute()方法发现在execute()中直接调用了拦截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法,源码如下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) 以上是关于Ribbon原理解析的主要内容,如果未能解决你的问题,请参考以下文章