spring cloud kubernetes源码解析之feign与loadbalancer

Posted 水中加点糖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud kubernetes源码解析之feign与loadbalancer相关的知识,希望对你有一定的参考价值。

现在是2021年的一个星期四,

窗外是刚下过大雨的夜色,伴着几点稀零的灯光。

我戴着耳机,

在客厅阳台上的书桌前吹着安静的风,

耳机中正好播放“夜风凛凛 独回望旧事前尘”张国荣的歌。


不知道不觉间,我来公司也已经二年了。

二年间匆匆过去,同样也预示着我也从大学毕业正好四年了。

在公司这二年间,我见证了公司的微服务从docker->docker swarm->kubernetes迁移的一路历程。

同时变化的也还有青涩的自己,从中级JAVA->项目技术负责人->team leader,一切感觉都是那么的真切。

一次次新模块产品的上线、一次次的重构后性能的极速提升、每次经历上万QPS并发时激动的心情,那些个激情澎湃的每一天。

直到一个多月前的一天,我接受了同学的建议,让我出去面试看看,试试外面更大规模公司的机会,几轮面试一路通过。

拿到offer后犹豫了很久,在同学的劝说下,我最终还是向公司提出了离职。


最后交接的两周,看着曾经写下的代码,重构后的部分模块,我竟后悔就不该一时冲动提出离职,也曾好几夜的胡思乱想与失眠。

尽管后面找了机会约了公司领导单独聊了聊,直到领导祝我在新的公司好好干后,我才明白职场中的忠诚度是什么。离职需谨慎!

既然没有了后悔的机会,也没有别的退路,那就勇往直前地向前冲吧(至少钱多)。

去新的公司好好干。




一夜过去,周五了。

我也早已想好,离职最后一天的我只需要在工位上静静地坐着,等着时间滴答滴答地走。一到点我起身,给周围同事一个笑容,之后默默地背起装着陪伴了我2年的办公用品的书包,给领导打声照顾说声再见。

再之后便是转身,只需走向办公室门外的电梯口,不再回头,按下电梯。

我看着电梯的门缓缓地合闭并关上。那一刻,也就是我和这可爱的公司、可爱的同事、可爱的领导短暂离别的那一刻。




根据了解,新的公司项目中没有使用spring cloud相关的技术栈,去了新公司后也要和这个高大上的框架做一个短暂的告别,甚是不舍。

趁着离职的最后一天也没什么工作安排,在公司里最后再记录一下关于spring cloud kubernetes框架中关于loadbalancer部分源码的阅读笔记。


前言

结合公司产品的部署环境都是部署在k8s下,为了让微服务更加配合k8s的环境,在我的建议下,领导最终同意了采用spring cloud kubernetes框架作为spring cloud服务发现和治理的组件。

修改为spring cloud kubernetes之后的项目,在经过了1年多的线上运行,经历了上千万笔订单和在上万QPS的验证下,目前一切运行良好。

其中项目中服务调用服务是通过feign调用的,在feign调用的过程中会使用到loadbalancer,本文将通过阅读源码,来看一下在spring cloud kubernetes中feign调用时的核心代码链,以及spring cloud kubernetes中loadbacner两种负载模式获取最终url的过程与区别是什么。

spring cloud feign调用主要过程

feign.ReflectiveFeign.FeignInvocationHandler#invoke

feign.SynchronousMethodHandler#invoke

  Object executeAndDecode(RequestTemplate template, Options options) throws Throwable 
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) 
      logger.logRequest(metadata.configKey(), logLevel, request);
    

    Response response;
    long start = System.nanoTime();
    try 
      response = client.execute(request, options);
      

org.springframework.cloud.openfeign.loadbalancer.RetryableFeignBlockingLoadBalancerClient#execute

@Override
	public Response execute(Request request, Request.Options options) throws IOException 
		final URI originalUri = URI.create(request.url());
		String serviceId = originalUri.getHost();
		Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
		final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(serviceId,
				loadBalancerClient);
		RetryTemplate retryTemplate = buildRetryTemplate(serviceId, request, retryPolicy);
		return retryTemplate.execute(context -> 
			Request feignRequest = null;
			ServiceInstance retrievedServiceInstance = null;
			Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
					.getSupportedLifecycleProcessors(
							loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
							RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
			String hint = getHint(serviceId);
			DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
					new RetryableRequestContext(null, buildRequestData(request), hint));
			// On retries the policy will choose the server and set it in the context
			// and extract the server and update the request being made
			if (context instanceof LoadBalancedRetryContext) 
				LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
				ServiceInstance serviceInstance = lbContext.getServiceInstance();
				if (serviceInstance == null) 
					if (LOG.isDebugEnabled()) 
						LOG.debug("Service instance retrieved from LoadBalancedRetryContext: was null. "
								+ "Reattempting service instance selection");
					
					ServiceInstance previousServiceInstance = lbContext.getPreviousServiceInstance();
					lbRequest.getContext().setPreviousServiceInstance(previousServiceInstance);
					supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
					//通过负载均衡器获取一个instance
					retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest);
					if (LOG.isDebugEnabled()) 
						LOG.debug(String.format("Selected service instance: %s", retrievedServiceInstance));
					
					lbContext.setServiceInstance(retrievedServiceInstance);
				

				if (retrievedServiceInstance == null) 
					if (LOG.isWarnEnabled()) 
						LOG.warn("Service instance was not resolved, executing the original request");
					
					org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
							retrievedServiceInstance);
					supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
							.onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
									CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
					feignRequest = request;
				
				else 
					if (LOG.isDebugEnabled()) 
						LOG.debug(String.format("Using service instance from LoadBalancedRetryContext: %s",
								retrievedServiceInstance));
					
					String reconstructedUrl = loadBalancerClient.reconstructURI(retrievedServiceInstance, originalUri)
							.toString();
					feignRequest = buildRequest(request, reconstructedUrl);
				
			
			org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
					retrievedServiceInstance);
			Response response = LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(delegate, options,
					feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors,
					retrievedServiceInstance != null);
			int responseStatus = response.status();
			if (retryPolicy != null && retryPolicy.retryableStatusCode(responseStatus)) 
				if (LOG.isDebugEnabled()) 
					LOG.debug(String.format("Retrying on status code: %d", responseStatus));
				
				response.close();
				throw new RetryableStatusCodeException(serviceId, responseStatus, response, URI.create(request.url()));
			
			return response;
		, new LoadBalancedRecoveryCallback<Response, Response>() 
			@Override
			protected Response createResponse(Response response, URI uri) 
				return response;
			
		);
	

org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient#choose(java.lang.String, org.springframework.cloud.client.loadbalancer.Request)

	@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) 
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) 
			return null;
		
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) 
			return null;
		
		return loadBalancerResponse.getServer();
	

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer#choose

	@SuppressWarnings("rawtypes")
	@Override
	// see original
	// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
	// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
	public Mono<Response<ServiceInstance>> choose(Request request) 
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		return supplier.get(request).next()
				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
	

org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)

    public T getIfAvailable(Supplier<T> defaultSupplier) throws BeansException 
        return this.delegate().getIfAvailable(defaultSupplier);
    

org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)

	@Override
	public T getIfAvailable(Supplier<T> defaultSupplier) throws BeansException 
		return delegate().getIfAvailable(defaultSupplier);
	

org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanProvider(org.springframework.core.ResolvableType, boolean)

@Override
	public <T> ObjectProvider<T> getBeanProvider(ResolvableType requiredType, boolean allowEagerInit) 
	    //org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
		return new BeanObjectProvider<T>() 
			@Override
			public T getObject() throws BeansException 
				T resolved = resolveBean(requiredType, null, false);
				if (resolved == null) 
					throw new NoSuchBeanDefinitionException(requiredType);
				
				return resolved;
			
			@Override
			public T getObject(Object... args) throws BeansException 
				T resolved = resolveBean(requiredType, args, false);
				if (resolved == null) 
					throw new NoSuchBeanDefinitionException(requiredType);
				
				return resolved;
			
			@Override
			@Nullable
			public T getIfAvailable() throws BeansException 
				try 
					return resolveBean(requiredType, null, false);
				
				catch (ScopeNotActiveException ex) 
					// Ignore resolved bean in non-active scope
					return null;
				
			
			@Override
			public void ifAvailable(Consumer<T> dependencyConsumer) throws BeansException 
				T dependency = getIfAvailable();
				if (dependency != null) 
					try 
						dependencyConsumer.accept(dependency);
					
					catch (ScopeNotActiveException ex) 
						// Ignore resolved bean in non-active scope, even on scoped proxy invocation
					
				
			
			@Override
			@Nullable
			public T getIfUnique() throws BeansException 
				try 
					return resolveBean(requiredType, null, true);
				
				catch (ScopeNotActiveException ex) 
					// Ignore resolved bean in non-active scope
					return null;
				
			
			@Override
			public void ifUnique(Consumer<T> dependencyConsumer) throws BeansException 
				T dependency = getIfUnique();
				if (dependency != null) 
					try 
						dependencyConsumer.accept(dependency);
					
					catch (ScopeNotActiveException ex) 
						// Ignore resolved bean in non-active scope, even on scoped proxy invocation
					
				
			
			@SuppressWarnings("unchecked")
			@Override
			public Stream<T> stream() 
				return Arrays.stream(getBeanNamesForTypedStream(requiredType, allowEagerInit))
						.map(name -> (T) getBean(name))
						.filter(bean -> !(bean instanceof NullBean));
			
			@SuppressWarnings("unchecked")
			@Override
			public Stream<T> orderedStream() 
				String[] beanNames = getBeanNamesForTypedStream(requiredType, allowEagerInit);
				if (beanNames.length == 0) 
					return Stream.empty();
				
				Map<String, T> matchingBeans = CollectionUtils.newLinkedHashMap(beanNames.length);
				for (String beanName : beanNames) 
					Object beanInstance = getBean(beanName);
					if (!(beanInstance instanceof NullBean)) 
						matchingBeans.put(beanName, (T) beanInstance);
					
				
				Stream<T> stream = matchingBeans.values().stream();
				return stream.sorted(adaptOrderComparator(matchingBeans));
			
		;
	

org.springframework.beans.factory.support.DefaultListableBeanFactory.BeanObjectProvider#getIfAvailable会调用resolveBean,
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveBean

@Nullable
	private <T> T resolveBean(ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull) 
		NamedBeanHolder<T> namedBean = resolveNamedBean(requiredType, args, nonUniqueAsNull);
		if (namedBean != null) 
			return namedBean.getBeanInstance();
		
		BeanFactory parent = getParentBeanFactory();
		if (parent instanceof DefaultListableBeanFactory) 
			return ((DefaultListableBeanFactory) parent).resolveBean(requiredType, args, nonUniqueAsNull);
		
		else if (parent != null) 
			ObjectProvider<T> parentProvider = parent.getBeanProvider(requiredType);
			if (args != null) 
				return parentProvider.getObject(args);
			
			else 
				return (nonUniqueAsNull ? parentProvider.getIfUnique() : parentProvider.getIfAvailable());
			
		
		return null;
	

上面的resolveNamedBean方法:
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveNamedBean(org.springframework.core.ResolvableType, java.lang.Object[], boolean)

@Nullable
	private <T> NamedBeanHolder<T> resolveNamedBean(
			ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull) throws BeansException 
		//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier

		Assert.notNull(requiredType, "Required type must not be null");
		String[] candidateNames = getBeanNamesForType(requiredType);

		if (candidateNames.length > 1) 
			List<String> autowireCandidates = new ArrayList<>(candidateNames.length);
			for (String beanName : candidateNames) 
				if (!containsBeanDefinition(beanName) || getBeanDefinition(beanName).isAutowireCandidate()) 
					autowireCandidates.add(beanName);
				
			
			if (!autowireCandidates.isEmpty()) 
				candidateNames = StringUtils.toStringArray(autowireCandidates);
			
		

		if (candidateNames.length == 1) 
			return resolveNamedBean(candidateNames[0], requiredType, args);
		
		else if (candidateNames.length > 1) 
			Map<String, Object> candidates = CollectionUtils.newLinkedHashMap(candidateNames.length);
			for (String beanName : candidateNames) 
				if (containsSingleton(beanName) && args == null) 
					Object beanInstance = getBean(beanName);
					candidates.put(beanName, (beanInstance instanceof NullBean ? null : beanInstance));
				
				else 
					candidates.put(beanName, getType(beanName));
				
			
			String candidateName = determinePrimaryCandidate(candidates, requiredType.toClass());
			if (candidateName == null) 
				candidateName = determineHighestPriorityCandidate(candidates, requiredType.toClass());
			
			if (candidateName != null) 
				Object beanInstance = candidates.get(candidateName);
				if (beanInstance == null) 
					return null;
				
				if (beanInstance instanceof Class) 
					return resolveNamedBean(candidateName, requiredType, args);
				
				return new NamedBeanHolder<>(candidateName, (T) beanInstance);
			
			if (!nonUniqueAsNull) 
				throw new NoUniqueBeanDefinitionException(requiredType, candidates.keySet());
			
		

		return null;
	

org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanNamesForType(org.springframework.core.ResolvableType)

	@Override
	public String[] getBeanNamesForType(ResolvableType type) 
	    //org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
		return getBeanNamesForType(type, true, true);
	
	
	@Override
	public String[] getBeanNamesForType(ResolvableType type, boolean includeNonSingletons, boolean allowEagerInit) 
	    //org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
		Class<?> resolved = type.resolve();
		if (resolved != null && !type.hasGenerics()) 
			return getBeanNamesForType(resolved, includeNonSingletons, allowEagerInit);
		
		else 
			return doGetBeanNamesForType(type, includeNonSingletons, allowEagerInit);
		
	

最终获得:
service:
org.springframework.cloud.kubernetes.client.loadbalancer.KubernetesClientServicesListSupplier

pod:
org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier

service模式下获取远程访问地址过程

待更新,此模式下默认不会走缓存

pod模式下获取远程访问地址过程

待更新,此模式下默认会走缓存

config

org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration

待更新,具体初始化过程

以上是关于spring cloud kubernetes源码解析之feign与loadbalancer的主要内容,如果未能解决你的问题,请参考以下文章

spring cloud kubernetes在pod模式下服务调用源码解析

spring cloud kubernetes在pod模式下服务调用源码解析

spring cloud kubernetes源码解析之feign与loadbalancer

spring cloud kubernetes源码解析之feign与loadbalancer

spring cloud kubernetes源码解析之feign与loadbalancer

spring-cloud-kubernetes与SpringCloud Gateway