ribbon源码解析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ribbon源码解析相关的知识,希望对你有一定的参考价值。
参考技术Aribbon的使用非常简单,只需要在spring容器中注入一个带有@LoadBalanced注解的RestTmplate就能直接使用
这里先编写一个服务,供client端调用
server端application.properties配置
client端直接调用
client端application.properties配置,这里我们配置了服务端的地址
之后访问本地/test接口,调用成功。
上面简单的配置就能使用RestTmplate来进行服务间的调用,这里配上ribbon的其他配置
其中ribbon自带的负载均衡策略有如下
接下来,我们看下为什么通过服务名并且加上@LoadBalanced注解就能实现对服务的调用并且还能实现负载均衡的功能
这里我们可以看到,此处注入所有带了注解@LoadBalanced的RestTemplate类。
这里有一点须知,@LoadBalanced注解实际上什么都没有,就是一个@Qualifier,而这个@Qualifier可以认为是一个标记,标识只将带有该标记的类注入进来。
这里我们可以看到,这个类就是给这些RestTmplate添加了LoadBalancerInterceptor拦截器。
而这里RestTmplate.getForObject方法最终会进入到execute这个方法,所有最终会到LoadBalancerInterceptor类,我们继续看LoadBalancerInterceptor.intercept方法
此处loadBalancer就是RibbonLoadBalancerClient,他来自于RibbonAutoConfiguration中的装配,serviceName就是调用的服务命,这里就是主机地址,也就是ribbon-server
继续进入RibbonLoadBalancerClient.execute(String serviceId, LoadBalancerRequest<T> request)
最终来到
这里可以先明确一下三个字段的值, defaultConfigType,propertySourceName,propertyName分别为RibbonClientConfiguration.class,ribbon,ribbon.client.name
而createContext方法做的主要几个事就是,在spring容器中装配了两个类PropertyPlaceholderAutoConfiguration,RibbonClientConfiguration,并且往spring环境中加了一个配置ribbon.client.name=ribbon-server。
为什么要这样做呢,在来看下RibbonClientConfiguration这个类
原来他是将RibbonClientConfiguration注册到容器中为之后容器能加在该类中的bean,而且我们看到一个name,而这个name就是通过之前注入的属性ribbon.client.name=ribbon-server最后注入到该字段上
并且ZoneAwareLoadBalancer初始化的时候,就把我们配置的ribbon-server.ribbon.listOfServers封装了成Server对象
到这里,ILoadBalancer loadBalancer = getLoadBalancer(serviceId)此处的ILoadBalancer就是ZoneAwareLoadBalancer
最终又会调用Irule路由规则获取到一个具体的Server,而IRule如果没有配置就是默认的RoundRobinRule轮询策略。
最终又会调用 request.apply ,此时的request则是我们之前传进来的
ServiceRequestWrapper会将服务名替换成具体的ip然后执行http请求
Ribbon从入门到源码解析
目录
4.4 DynamicServerListLoadBalancer
1、简介
在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在Spring Cloud生态中使用的比较广泛的技术是Ribbon。
2、案例
无论是使用Fegin还是RestTemplate发起服务调用,客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。
2.1 搭建服务注册中心EurekaServer
-
pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
-
application.yml
server:
port: 18888
spring:
application:
name: eurekaServer
eureka:
client:
# fetch-registry: false
# register-with-eureka: false
service-url:
defaultZone: http://127.0.0.1:18888/eureka
-
启动类
@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication
public static void main(String[] args)
SpringApplication.run(EurekaServerApplication.class, args);
2.2 搭建order-service服务
-
pom依赖
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:
port: 18070
# name
spring:
application:
name: order-service
# eureka server
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("order")
public class OrderController
@Autowired
private OrderService orderService;
@GetMapping("orderId")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId)
// 根据id查询订单并返回
return orderService.queryOrderById(orderId);
@Service
public class OrderService
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderMapper orderMapper;
public Order queryOrderById(Long orderId)
// 1.查询订单
Order order = orderMapper.findById(orderId);
// 2、查询用户信息
if (Objects.nonNull(order))
String url = String.format("http://user-service/user/%s", order.getUserId());
User user = restTemplate.getForObject(url, User.class);
// 3、封装用户信息
order.setUser(user);
// 4.返回
return order;
-
启动类中注入RestTemplate并开启负载均衡
@MapperScan("com.lzb.order.mapper")
@SpringBootApplication
@EnableEurekaClient
public class OrderApplication
public static void main(String[] args)
SpringApplication.run(OrderApplication.class, args);
/**
* RestTemplate bean容器的注入
* LoadBalanced 负载均衡注解
* @return
*/
@Bean
@LoadBalanced
public RestTemplate restTemplate()
return new RestTemplate();
2.3 搭建user-service服务
-
pom依赖
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:
port: 18080
# name
spring:
application:
name: user-service
# eureka server
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("/user")
public class UserController
@Autowired
private UserService userService;
@GetMapping("/id")
public User queryById(@PathVariable("id") Long id)
return userService.queryById(id);
-
启动类
@MapperScan("com.lzb.user.mapper")
@SpringBootApplication
@EnableEurekaClient
public class UserApplication
public static void main(String[] args)
SpringApplication.run(UserApplication.class, args);
2.4 服务启动
在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:
-
启动EurekaServer
-
启动user-service
-
启动user-service2
-
启动order-service
关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:
-
首先启动该服务,直至服务启动成功
-
右键启动的服务,选择Copy Configuration
-
Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;
-
启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。
-
服务启动后Eureka Server中服务注册信息如下所示
2.5 测试结果
清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据负载均衡策略选择服务进行调用!
可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1
3、Ribbon如何实现负载均衡
可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:
-
拦截Http请求
-
解析请求中的服务名
-
在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息
-
根据负载均衡策略选择服务提供者发起http请求
3.1 拦截http请求
在springboot中常用的拦截器有三个:
-
org.springframework.web.servlet.HandlerInterceptor
-
org.springframework.http.client.ClientHttpRequestInterceptor
-
feign.RequestInterceptor
三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。
3.2 解析请求中的服务名
org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service
此时可以看到request.getURI()得到的是http://user-service/user/4 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名
3.3 根据服务名获取服务IP和Port信息
在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient类中execute()方法。
此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。 this.getLoadBalancer(serviceId)方法调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类com.netflix.loadbalancer.DynamicServerListLoadBalancer实例。 那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。
3.4 根据负载均衡策略发起http请求
最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在com.netflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。在选择发起请求的服务之后执行org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。
4、简单源码解析
在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。
4.1 ILoadBalancer
com.netflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。
方法作用如下所示:
方法名 | 作用 |
---|---|
addServers | 1、服务器列表初始化 |
2、添加新的服务 | |
chooseServer | 从负载均衡器中选择服务器 |
markServerDown | 负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期 |
getServerList | @Deprecated |
getReachableServers | 获取能正常访问的服务器 |
getAllServers | 获取所有已知的服务器,包括可访问的和不可访问的 |
4.2 AbstractLoadBalancer
com.netflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了com.netflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:
public abstract class AbstractLoadBalancer implements ILoadBalancer
public enum ServerGroup
ALL,
STATUS_UP,
STATUS_NOT_UP
public Server chooseServer()
return chooseServer(null);
public abstract List<Server> getServerList(ServerGroup serverGroup);
public abstract LoadBalancerStats getLoadBalancerStats();
AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:
-
ALL 表示所有服务
-
STATUS_UP 表示正常服务
-
STATUS_NOT_UP 表示下线服务
4.3 BaseLoadBalancer
com.netflix.loadbalancer.BaseLoadBalancer类继承了com.netflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
allServerList 用于保存所有服务实例
-
upServerList用于保存所有在线服务实例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
-
定义负载均衡默认策略为轮询
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
-
IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
-
BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。
protected int pingIntervalSeconds = 10;
public BaseLoadBalancer()
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
void setupPingTask()
if (canSkipPing())
return;
if (lbTimer != null)
lbTimer.cancel();
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
4.4 DynamicServerListLoadBalancer
com.netflix.loadbalancer.DynamicServerListLoadBalancer类继承了com.netflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法
volatile ServerList<T> serverListImpl;
-
getInitialListOfServers方法用于获取所有初始化服务列表
-
getUpdatedListOfServers方法用于获取更新的服务实例列表
public interface ServerList<T extends Server>
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
-
ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList
-
DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo
private List<DiscoveryEnabledServer> obtainServersViaDiscovery()
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null)
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null)
for (String vipAddress : vipAddresses.split(","))
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo)
if (ii.getStatus().equals(InstanceStatus.UP))
if(shouldUseOverridePort)
if(logger.isDebugEnabled())
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure)
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
else
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
if (serverList.size()>0 && prioritizeVipAddressBasedServers)
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
return serverList;
-
DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。ServerListUpdater提供两个实现类com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater和com.netflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。
-
PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
public PollingServerListUpdater()
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs)
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
public synchronized void start(final UpdateAction updateAction)
if (isActive.compareAndSet(false, true))
final Runnable wrapperRunnable = new Runnable()
@Override
public void run()
if (!isActive.get())
if (scheduledFuture != null)
scheduledFuture.cancel(true);
return;
try
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
catch (Exception e)
logger.warn("Failed one update cycle", e);
;
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
else
logger.info("Already active, no-op");
-
在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。
volatile ServerListFilter<T> filter;
public void updateListOfServers()
List<T> servers = new ArrayList<T>();
if (serverListImpl != null)
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for obtained from Discovery client: ",
getIdentifier(), servers);
if (filter != null)
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for obtained from Discovery client: ",
getIdentifier(), servers);
updateAllServerList(servers);
-
ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。
4.5 ZoneAwareLoadBalancer
com.netflix.loadbalancer.ZoneAwareLoadBalancer是com.netflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap)
super.setServerListForZones(zoneServersMap);
if (balancers == null)
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet())
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet())
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey()))
existingLBEntry.getValue().setServersList(Collections.emptyList());
👇🏻 关注公众号 我们一起进大厂👇🏻
以上是关于ribbon源码解析的主要内容,如果未能解决你的问题,请参考以下文章