ribbon源码解析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ribbon源码解析相关的知识,希望对你有一定的参考价值。

参考技术A

ribbon的使用非常简单,只需要在spring容器中注入一个带有@LoadBalanced注解的RestTmplate就能直接使用

这里先编写一个服务,供client端调用

server端application.properties配置

client端直接调用

client端application.properties配置,这里我们配置了服务端的地址

之后访问本地/test接口,调用成功。

上面简单的配置就能使用RestTmplate来进行服务间的调用,这里配上ribbon的其他配置

其中ribbon自带的负载均衡策略有如下

接下来,我们看下为什么通过服务名并且加上@LoadBalanced注解就能实现对服务的调用并且还能实现负载均衡的功能

这里我们可以看到,此处注入所有带了注解@LoadBalanced的RestTemplate类。

这里有一点须知,@LoadBalanced注解实际上什么都没有,就是一个@Qualifier,而这个@Qualifier可以认为是一个标记,标识只将带有该标记的类注入进来。

这里我们可以看到,这个类就是给这些RestTmplate添加了LoadBalancerInterceptor拦截器。

而这里RestTmplate.getForObject方法最终会进入到execute这个方法,所有最终会到LoadBalancerInterceptor类,我们继续看LoadBalancerInterceptor.intercept方法

此处loadBalancer就是RibbonLoadBalancerClient,他来自于RibbonAutoConfiguration中的装配,serviceName就是调用的服务命,这里就是主机地址,也就是ribbon-server

继续进入RibbonLoadBalancerClient.execute(String serviceId, LoadBalancerRequest<T> request)

最终来到

这里可以先明确一下三个字段的值, defaultConfigType,propertySourceName,propertyName分别为RibbonClientConfiguration.class,ribbon,ribbon.client.name

而createContext方法做的主要几个事就是,在spring容器中装配了两个类PropertyPlaceholderAutoConfiguration,RibbonClientConfiguration,并且往spring环境中加了一个配置ribbon.client.name=ribbon-server。
为什么要这样做呢,在来看下RibbonClientConfiguration这个类

原来他是将RibbonClientConfiguration注册到容器中为之后容器能加在该类中的bean,而且我们看到一个name,而这个name就是通过之前注入的属性ribbon.client.name=ribbon-server最后注入到该字段上

并且ZoneAwareLoadBalancer初始化的时候,就把我们配置的ribbon-server.ribbon.listOfServers封装了成Server对象

到这里,ILoadBalancer loadBalancer = getLoadBalancer(serviceId)此处的ILoadBalancer就是ZoneAwareLoadBalancer

最终又会调用Irule路由规则获取到一个具体的Server,而IRule如果没有配置就是默认的RoundRobinRule轮询策略。

最终又会调用 request.apply ,此时的request则是我们之前传进来的

ServiceRequestWrapper会将服务名替换成具体的ip然后执行http请求

Ribbon从入门到源码解析

目录

1、简介

2、案例

2.1 搭建服务注册中心EurekaServer

2.2 搭建order-service服务

2.3 搭建user-service服务

2.4 服务启动

2.5 测试结果

3、Ribbon如何实现负载均衡

3.1 拦截http请求

3.2 解析请求中的服务名

3.3 根据服务名获取服务IP和Port信息

3.4 根据负载均衡策略发起http请求

4、简单源码解析

4.1 ILoadBalancer

4.2 AbstractLoadBalancer

4.3 BaseLoadBalancer

4.4 DynamicServerListLoadBalancer

4.5 ZoneAwareLoadBalancer


1、简介

在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在Spring Cloud生态中使用的比较广泛的技术是Ribbon。

2、案例

无论是使用Fegin还是RestTemplate发起服务调用,客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。

2.1 搭建服务注册中心EurekaServer

  • pom依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
  • application.yml

server:
  port: 18888

spring:
  application:
    name: eurekaServer

eureka:
  client:
#    fetch-registry: false
#    register-with-eureka: false
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka
  • 启动类

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication 

    public static void main(String[] args) 
        SpringApplication.run(EurekaServerApplication.class, args);
    


2.2 搭建order-service服务

  • pom依赖

<!--web-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
  • application.yml

# server port
server:
  port: 18070

# name
spring:
  application:
    name: order-service

# eureka server
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka
  • 模拟业务代码

@RestController
@RequestMapping("order")
public class OrderController 
    
    @Autowired
    private OrderService orderService;
    
    @GetMapping("orderId")
    public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) 
        // 根据id查询订单并返回
        return orderService.queryOrderById(orderId);
    

@Service
public class OrderService 

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderMapper orderMapper;

    public Order queryOrderById(Long orderId) 
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        // 2、查询用户信息
        if (Objects.nonNull(order)) 
            String url = String.format("http://user-service/user/%s", order.getUserId());
            User user = restTemplate.getForObject(url, User.class);
            // 3、封装用户信息
            order.setUser(user);
        
        // 4.返回
        return order;
    

  • 启动类中注入RestTemplate并开启负载均衡

@MapperScan("com.lzb.order.mapper")
@SpringBootApplication
@EnableEurekaClient
public class OrderApplication 

    public static void main(String[] args) 
        SpringApplication.run(OrderApplication.class, args);
    

    /**
     * RestTemplate bean容器的注入
     * LoadBalanced 负载均衡注解
     * @return
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() 
        return new RestTemplate();
    


2.3 搭建user-service服务

  • pom依赖

<!--web-->
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

  • application.yml

# server port
server:
  port: 18080

# name
spring:
  application:
    name: user-service

# eureka server
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:18888/eureka

  • 模拟业务代码

@RestController
@RequestMapping("/user")
public class UserController 

    @Autowired
    private UserService userService;

    @GetMapping("/id")
    public User queryById(@PathVariable("id") Long id) 
        return userService.queryById(id);
    


  • 启动类

@MapperScan("com.lzb.user.mapper")
@SpringBootApplication
@EnableEurekaClient
public class UserApplication 
    public static void main(String[] args) 
        SpringApplication.run(UserApplication.class, args);
    


2.4 服务启动

在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:

  • 启动EurekaServer

  • 启动user-service

  • 启动user-service2

  • 启动order-service

关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:

  • 首先启动该服务,直至服务启动成功

  • 右键启动的服务,选择Copy Configuration

  • Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;

  • 启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。

  • 服务启动后Eureka Server中服务注册信息如下所示

2.5 测试结果

清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据负载均衡策略选择服务进行调用!

可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1

3、Ribbon如何实现负载均衡

可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:

  • 拦截Http请求

  • 解析请求中的服务名

  • 在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息

  • 根据负载均衡策略选择服务提供者发起http请求

3.1 拦截http请求

在springboot中常用的拦截器有三个:

  • org.springframework.web.servlet.HandlerInterceptor

  • org.springframework.http.client.ClientHttpRequestInterceptor

  • feign.RequestInterceptor

三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。

3.2 解析请求中的服务名

org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service

此时可以看到request.getURI()得到的是http://user-service/user/4 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名

3.3 根据服务名获取服务IP和Port信息

在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient类中execute()方法。

此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。 this.getLoadBalancer(serviceId)方法调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类com.netflix.loadbalancer.DynamicServerListLoadBalancer实例。 那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。

3.4 根据负载均衡策略发起http请求

最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在com.netflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。在选择发起请求的服务之后执行org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。

4、简单源码解析

在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。

4.1 ILoadBalancer

com.netflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。

方法作用如下所示:

方法名作用
addServers1、服务器列表初始化
2、添加新的服务
chooseServer从负载均衡器中选择服务器
markServerDown负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期
getServerList@Deprecated
getReachableServers获取能正常访问的服务器
getAllServers获取所有已知的服务器,包括可访问的和不可访问的

4.2 AbstractLoadBalancer

com.netflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了com.netflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:

public abstract class AbstractLoadBalancer implements ILoadBalancer 
    
    public enum ServerGroup
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    

    public Server chooseServer() 
     return chooseServer(null);
    

    public abstract List<Server> getServerList(ServerGroup serverGroup);

    public abstract LoadBalancerStats getLoadBalancerStats();    


AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:

  • ALL 表示所有服务

  • STATUS_UP 表示正常服务

  • STATUS_NOT_UP 表示下线服务

4.3 BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer类继承了com.netflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。

  • allServerList 用于保存所有服务实例

  • upServerList用于保存所有在线服务实例

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
        .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
        .synchronizedList(new ArrayList<Server>());

  • 定义负载均衡默认策略为轮询

private final static IRule DEFAULT_RULE = new RoundRobinRule(); 
protected IRule rule = DEFAULT_RULE;

  • IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。

private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

  • BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。

protected int pingIntervalSeconds = 10;

public BaseLoadBalancer() 
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);


void setupPingTask() 
    if (canSkipPing()) 
        return;
    
    if (lbTimer != null) 
        lbTimer.cancel();
    
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();


4.4 DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer类继承了com.netflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。

  • serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法

volatile ServerList<T> serverListImpl;

  • getInitialListOfServers方法用于获取所有初始化服务列表

  • getUpdatedListOfServers方法用于获取更新的服务实例列表

public interface ServerList<T extends Server> 

    public List<T> getInitialListOfServers();
    
    public List<T> getUpdatedListOfServers();   



  • ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList

  • DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() 
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) 
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null)
            for (String vipAddress : vipAddresses.split(",")) 
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) 
                    if (ii.getStatus().equals(InstanceStatus.UP)) 

                        if(shouldUseOverridePort)
                            if(logger.isDebugEnabled())
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure)
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            else
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            
                        

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    
                
                if (serverList.size()>0 && prioritizeVipAddressBasedServers)
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                
            
        
        return serverList;
    

  • DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。ServerListUpdater提供两个实现类com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater和com.netflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。

  • PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;  

public PollingServerListUpdater() 
    this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);


public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) 
    this.initialDelayMs = initialDelayMs;
    this.refreshIntervalMs = refreshIntervalMs;


public synchronized void start(final UpdateAction updateAction) 
    if (isActive.compareAndSet(false, true)) 
        final Runnable wrapperRunnable = new Runnable() 
            @Override
            public void run() 
                if (!isActive.get()) 
                    if (scheduledFuture != null) 
                        scheduledFuture.cancel(true);
                    
                    return;
                
                try 
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                 catch (Exception e) 
                    logger.warn("Failed one update cycle", e);
                
            
        ;

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
     else 
        logger.info("Already active, no-op");
    


  • 在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。

volatile ServerListFilter<T> filter;

public void updateListOfServers() 
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) 
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for  obtained from Discovery client: ",
                getIdentifier(), servers);

        if (filter != null) 
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for  obtained from Discovery client: ",
                    getIdentifier(), servers);
        
    
    updateAllServerList(servers);


  • ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。

4.5 ZoneAwareLoadBalancer

com.netflix.loadbalancer.ZoneAwareLoadBalancer是com.netflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone

protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) 
    super.setServerListForZones(zoneServersMap);
    if (balancers == null) 
        balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
    
    for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) 
        String zone = entry.getKey().toLowerCase();
        getLoadBalancer(zone).setServersList(entry.getValue());
    
    // check if there is any zone that no longer has a server
    // and set the list to empty so that the zone related metrics does not
    // contain stale data
    for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) 
        if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) 
            existingLBEntry.getValue().setServersList(Collections.emptyList());
        
    
 

 👇🏻 关注公众号 我们一起进大厂👇🏻     

以上是关于ribbon源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Ribbon从入门到源码解析

Ribbon从入门到源码解析

Ribbon从入门到源码解析

Ribbon从入门到源码解析

Ribbon核心API源码解析:ribbon-coreIClient请求客户端 - 02

spring cloud ribbon源码解析