Ribbon原理解析

Posted 奋斗吧_攻城狮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ribbon原理解析相关的知识,希望对你有一定的参考价值。

一. 核心接口

ILoadBalancer

Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能,此接口会根据不同的负载均衡策略(IRule)选择合适的Server返回给使用者。其核心方法如下:

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);

    public Server chooseServer(Object key);
    
    public void markServerDown(Server server);
    
    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

此接口默认实现类为ZoneAwareLoadBalancer,相关类关系图如下:

IRule

IRule是负载均衡策略的抽象,ILoadBalancer通过调用IRule的choose()方法返回Server,其核心方法如下:

public interface IRule{
    
    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

实现类有:

  • BestAviableRule
    跳过熔断的Server,在剩下的Server中选择并发请求最低的Server
    ClientConfigEnabledRoundRobinRule、RoundRobinRule轮询
  • RandomRule
    随机选择
  • RetryRule
    可重试的策略,可以对其他策略进行重试,默认轮询重试
  • WeightedResponseTimeRule
    根据响应时间加权,响应时间越短权重越大
  • AvailabilityFilteringRule
    剔除因为连续链接、读失败或链接超过最大限制导致熔断的Server,在剩下读Server中进行轮询。

相关类图如下:

IPing

IPing用来检测Server是否可用,ILoadBalancer的实现类维护一个Timer每隔10s检测一次Server的可用状态,其核心方法有:

public interface IPing {
    public boolean isAlive(Server server);
}

其实现类有:

IClientConfig

IClientConfig主要定义了用于初始化各种客户端和负载均衡器的配置信息,器实现类为DefaultClientConfigImpl。

二. 负载均衡的逻辑实现

Server的选择

ILoadBalancer接口的主要实现类为BaseLoadBalancer和ZoneAwareLoadBalancer,ZoneAwareLoadBalancer为BaseLoadBalancer的子类并且其也重写了chooseServer方法,ZoneAwareLoadBalancer从其名称可以看出这个实现类是和Spring Cloud的分区有关的,当分区的数量为1(默认配置)时它直接调用父类BaseLoadBalancer的chooseServer()方法,源码如下:

@Override
public Server chooseServer(Object key) {
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        // 调用父类BaseLoadBalancer的chooseServer()方法
        return super.chooseServer(key);
    }
    // 略
}

类BaseLoadBalancer的chooseServer()方法直接调用IRule接口的choose()方法,源码如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

这里IRule的实现类为ZoneAvoidanceRule,choose()方法的实现在其父类PredicateBasedRule中,如下:

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

从上面源码可以看出,其先调用ILoadBalancer的getAllServers()方法获取所有Server列表,getAllServers()方法的实现在BaseLoadBalancer类中,此类维护了一个List类型的属性allServerList,所有Server都缓存至此集合中。获取Server列表后调用chooseRoundRobinAfterFiltering()方法返回Server对象。chooseRoundRobinAfterFiltering()方法会根据loadBalancerKey筛选出候选的Server,然后通过轮询的负载均衡策略选出Server,相关源码如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

可以看到其轮询选择Server的策略为获取次数加1然后对Server数量取余得到。

Server的状态检测

BaseLoadBalancer类的集合allServerList缓存了所有Server信息,但是这些Server的状态有可能发生变化,比如Server不可用了,Ribbon就需要及时感知到,那么Ribbon是如何感知Server可用不可用的呢?
BaseLoadBalancer的构造函数中初始化了一个任务调度器Timer,这个调度器每隔10s执行一次PingTask任务,相关源码如下:

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
    this.name = name;
    this.ping = ping;
    this.pingStrategy = pingStrategy;
    setRule(rule);
    setupPingTask();
    lbStats = stats;
    init();
}
    
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

class PingTask extends TimerTask {
    public void run() {
        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

深入Pinger和SerialPingStrategy的源码可知,最终通过NIWSDiscoveryPing这一IPing实现类判断Server是否可用,NIWSDiscoveryPing的isAlive()方法通过判断与Server关联的InstanceInfo的status是否为UP来判断Server是否可用,其isAlive()方法源码如下:

public boolean isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){                    
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                // 其状态是否为UP
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}

三、Ribbon的使用姿势

RestTemplate + @LoadBalanced

提供一个标记@LoadBalanced的RestTemplate Bean,然后直接使用此Bean发起请求即可,如下:

@Configuration
public class Config {

    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        // 提供一个标记@LoadBalanced的RestTemplat Bean
        return new RestTemplate();
    }
}

@RestController
public class HelloController {

    @Resource
    private RestTemplate restTemplate;
    
    @GetMapping("/hi")
    public String hi() {
        // 直接使用即可
        return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody();
    }
}

当实例化LoadBalancerAutoConfiguration时,给所有标记了@LoadBalanced的RestTemplate Bean设置了拦截器LoadBalancerInterceptor,此实例保存在了RestTemplate的父类InterceptingHttpAccessor的集合List interceptors中。RestTemplate相关类图如下:

设置拦截器LoadBalancerInterceptor源码如下:

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    // 1. 收集到所有标记了@LoadBalanced的RestTemplate
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    // 3. 对于每一个RestTemplate执行customize()方法
                    customizer.customize(restTemplate);
                }
            }
        });
    }

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
    }

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            // 2. 注入LoadBalancerInterceptor
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                // 4. customize()方法给RestTemplate设置LoadBalancerInterceptor
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }
    // 略
}

从上面源码可以看出LoadBalancerInterceptor的构造函数接受两个参数:LoadBalancerClient和LoadBalancerRequestFactory,LoadBalancerRequestFactory的实例在此Configuration中被注入类,而LoadBalancerClient的实例却没有。那么LoadBalancerClient的实例是在哪里实例化的呢?答案是RibbonAutoConfiguration,这个Configuration注入了LoadBalancerClient的实现类RibbonLoadBalancerClient的实例和SpringClientFactory的实例,相关源码如下:

@Bean
public SpringClientFactory springClientFactory() {
    SpringClientFactory factory = new SpringClientFactory();
    factory.setConfigurations(this.configurations);
    return factory;
}

@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
    return new RibbonLoadBalancerClient(springClientFactory());
}

至此拦截器LoadBalancerInterceptor创建完成并且保存在了RestTemplate的集合属性中,那么RestTemplate是如何利用此拦截器的呢?当我们使用RestTemplate发起请求时最终会调用到RestTemplate的doExecute()方法,此方法会创建ClientHttpRequest对象并调用其execute()方法发起请求,源码如下:

protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
        @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

    ClientHttpResponse response = null;
    try {
        // 1. 创建ClientHttpRequest。
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        // 2. 执行其execute()方法获取结果。
        response = request.execute();
        handleResponse(url, method, response);
        return (responseExtractor != null ? responseExtractor.extractData(response) : null);
    }
    catch (IOException ex) {
        String resource = url.toString();
        String query = url.getRawQuery();
        resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
        throw new ResourceAccessException("I/O error on " + method.name() +
                " request for \\"" + resource + "\\": " + ex.getMessage(), ex);
    }
    finally {
        if (response != null) {
            response.close();
        }
    }
}

protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    ClientHttpRequest request = getRequestFactory().createRequest(url, method);
    if (logger.isDebugEnabled()) {
        logger.debug("HTTP " + method.name() + " " + url);
    }
    return request;
}

@Override
public ClientHttpRequestFactory getRequestFactory() {
    List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
    if (!CollectionUtils.isEmpty(interceptors)) {
        ClientHttpRequestFactory factory = this.interceptingRequestFactory;
        if (factory == null) {
            factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
            this.interceptingRequestFactory = factory;
        }
        return factory;
    }
    else {
        return super.getRequestFactory();
    }
}

从上面的getRequestFactory()方法可以看到当集合interceptors不为空的时候ClientHttpRequest对象是由类InterceptingClientHttpRequestFactory的createRequest()方法创建出来的,并且集合interceptors作为参数传递到了InterceptingClientHttpRequestFactory中,深入InterceptingClientHttpRequestFactory的createRequest()方法,如下:

public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

    private final List<ClientHttpRequestInterceptor> interceptors;

    public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
            @Nullable List<ClientHttpRequestInterceptor> interceptors) {

        super(requestFactory);
        this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
    }

    @Override
    protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
        // 直接返回InterceptingClientHttpRequest对象。
        return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
    }

}

可以看到拦截器最终传递到了InterceptingClientHttpRequest中,上面说了RestTemplate的doExecute()方法创建了InterceptingClientHttpRequest对象且调用了其execute()方法获取响应结果,深入其execute()方法发现在execute()中直接调用了拦截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法,源码如下:

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) 以上是关于Ribbon原理解析的主要内容,如果未能解决你的问题,请参考以下文章

Ribbon原理解析

Ribbon原理解析

Ribbon负载均衡原理

Ribbon负载均衡原理

Ribbon负载均衡原理

撸一撸Spring Cloud Ribbon的原理