spring cloud kubernetes源码解析之feign与loadbalancer
Posted 水中加点糖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud kubernetes源码解析之feign与loadbalancer相关的知识,希望对你有一定的参考价值。
现在是2021年的一个星期四,
窗外是刚下过大雨的夜色,伴着几点稀零的灯光。
我戴着耳机,
在客厅阳台上的书桌前吹着安静的风,
耳机中正好播放“夜风凛凛 独回望旧事前尘”张国荣的歌。
不知道不觉间,我来公司也已经二年了。
二年间匆匆过去,同样也预示着我也从大学毕业正好四年了。
在公司这二年间,我见证了公司的微服务从docker->docker swarm->kubernetes迁移的一路历程。
同时变化的也还有青涩的自己,从中级JAVA->项目技术负责人->team leader,一切感觉都是那么的真切。
一次次新模块产品的上线、一次次的重构后性能的极速提升、每次经历上万QPS并发时激动的心情,那些个激情澎湃的每一天。
直到一个多月前的一天,我接受了同学的建议,让我出去面试看看,试试外面更大规模公司的机会,几轮面试一路通过。
拿到offer后犹豫了很久,在同学的劝说下,我最终还是向公司提出了离职。
最后交接的两周,看着曾经写下的代码,重构后的部分模块,我竟后悔就不该一时冲动提出离职,也曾好几夜的胡思乱想与失眠。
尽管后面找了机会约了公司领导单独聊了聊,直到领导祝我在新的公司好好干后,我才明白职场中的忠诚度是什么。离职需谨慎!
既然没有了后悔的机会,也没有别的退路,那就勇往直前地向前冲吧(至少钱多)。
去新的公司好好干。
一夜过去,周五了。
我也早已想好,离职最后一天的我只需要在工位上静静地坐着,等着时间滴答滴答地走。一到点我起身,给周围同事一个笑容,之后默默地背起装着陪伴了我2年的办公用品的书包,给领导打声照顾说声再见。
再之后便是转身,只需走向办公室门外的电梯口,不再回头,按下电梯。
我看着电梯的门缓缓地合闭并关上。那一刻,也就是我和这可爱的公司、可爱的同事、可爱的领导短暂离别的那一刻。
根据了解,新的公司项目中没有使用spring cloud相关的技术栈,去了新公司后也要和这个高大上的框架做一个短暂的告别,甚是不舍。
趁着离职的最后一天也没什么工作安排,在公司里最后再记录一下关于spring cloud kubernetes框架中关于loadbalancer部分源码的阅读笔记。
前言
结合公司产品的部署环境都是部署在k8s下,为了让微服务更加配合k8s的环境,在我的建议下,领导最终同意了采用spring cloud kubernetes框架作为spring cloud服务发现和治理的组件。
修改为spring cloud kubernetes之后的项目,在经过了1年多的线上运行,经历了上千万笔订单和在上万QPS的验证下,目前一切运行良好。
其中项目中服务调用服务是通过feign调用的,在feign调用的过程中会使用到loadbalancer,本文将通过阅读源码,来看一下在spring cloud kubernetes中feign调用时的核心代码链,以及spring cloud kubernetes中loadbacner两种负载模式获取最终url的过程与区别是什么。
spring cloud feign调用主要过程
feign.ReflectiveFeign.FeignInvocationHandler#invoke
feign.SynchronousMethodHandler#invoke
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE)
logger.logRequest(metadata.configKey(), logLevel, request);
Response response;
long start = System.nanoTime();
try
response = client.execute(request, options);
org.springframework.cloud.openfeign.loadbalancer.RetryableFeignBlockingLoadBalancerClient#execute
@Override
public Response execute(Request request, Request.Options options) throws IOException
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(serviceId,
loadBalancerClient);
RetryTemplate retryTemplate = buildRetryTemplate(serviceId, request, retryPolicy);
return retryTemplate.execute(context ->
Request feignRequest = null;
ServiceInstance retrievedServiceInstance = null;
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
String hint = getHint(serviceId);
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
new RetryableRequestContext(null, buildRequestData(request), hint));
// On retries the policy will choose the server and set it in the context
// and extract the server and update the request being made
if (context instanceof LoadBalancedRetryContext)
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
ServiceInstance serviceInstance = lbContext.getServiceInstance();
if (serviceInstance == null)
if (LOG.isDebugEnabled())
LOG.debug("Service instance retrieved from LoadBalancedRetryContext: was null. "
+ "Reattempting service instance selection");
ServiceInstance previousServiceInstance = lbContext.getPreviousServiceInstance();
lbRequest.getContext().setPreviousServiceInstance(previousServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
//通过负载均衡器获取一个instance
retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest);
if (LOG.isDebugEnabled())
LOG.debug(String.format("Selected service instance: %s", retrievedServiceInstance));
lbContext.setServiceInstance(retrievedServiceInstance);
if (retrievedServiceInstance == null)
if (LOG.isWarnEnabled())
LOG.warn("Service instance was not resolved, executing the original request");
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
feignRequest = request;
else
if (LOG.isDebugEnabled())
LOG.debug(String.format("Using service instance from LoadBalancedRetryContext: %s",
retrievedServiceInstance));
String reconstructedUrl = loadBalancerClient.reconstructURI(retrievedServiceInstance, originalUri)
.toString();
feignRequest = buildRequest(request, reconstructedUrl);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
Response response = LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(delegate, options,
feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors,
retrievedServiceInstance != null);
int responseStatus = response.status();
if (retryPolicy != null && retryPolicy.retryableStatusCode(responseStatus))
if (LOG.isDebugEnabled())
LOG.debug(String.format("Retrying on status code: %d", responseStatus));
response.close();
throw new RetryableStatusCodeException(serviceId, responseStatus, response, URI.create(request.url()));
return response;
, new LoadBalancedRecoveryCallback<Response, Response>()
@Override
protected Response createResponse(Response response, URI uri)
return response;
);
org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient#choose(java.lang.String, org.springframework.cloud.client.loadbalancer.Request)
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request)
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null)
return null;
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null)
return null;
return loadBalancerResponse.getServer();
org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer#choose
@SuppressWarnings("rawtypes")
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono<Response<ServiceInstance>> choose(Request request)
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)
public T getIfAvailable(Supplier<T> defaultSupplier) throws BeansException
return this.delegate().getIfAvailable(defaultSupplier);
org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)
@Override
public T getIfAvailable(Supplier<T> defaultSupplier) throws BeansException
return delegate().getIfAvailable(defaultSupplier);
org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanProvider(org.springframework.core.ResolvableType, boolean)
@Override
public <T> ObjectProvider<T> getBeanProvider(ResolvableType requiredType, boolean allowEagerInit)
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
return new BeanObjectProvider<T>()
@Override
public T getObject() throws BeansException
T resolved = resolveBean(requiredType, null, false);
if (resolved == null)
throw new NoSuchBeanDefinitionException(requiredType);
return resolved;
@Override
public T getObject(Object... args) throws BeansException
T resolved = resolveBean(requiredType, args, false);
if (resolved == null)
throw new NoSuchBeanDefinitionException(requiredType);
return resolved;
@Override
@Nullable
public T getIfAvailable() throws BeansException
try
return resolveBean(requiredType, null, false);
catch (ScopeNotActiveException ex)
// Ignore resolved bean in non-active scope
return null;
@Override
public void ifAvailable(Consumer<T> dependencyConsumer) throws BeansException
T dependency = getIfAvailable();
if (dependency != null)
try
dependencyConsumer.accept(dependency);
catch (ScopeNotActiveException ex)
// Ignore resolved bean in non-active scope, even on scoped proxy invocation
@Override
@Nullable
public T getIfUnique() throws BeansException
try
return resolveBean(requiredType, null, true);
catch (ScopeNotActiveException ex)
// Ignore resolved bean in non-active scope
return null;
@Override
public void ifUnique(Consumer<T> dependencyConsumer) throws BeansException
T dependency = getIfUnique();
if (dependency != null)
try
dependencyConsumer.accept(dependency);
catch (ScopeNotActiveException ex)
// Ignore resolved bean in non-active scope, even on scoped proxy invocation
@SuppressWarnings("unchecked")
@Override
public Stream<T> stream()
return Arrays.stream(getBeanNamesForTypedStream(requiredType, allowEagerInit))
.map(name -> (T) getBean(name))
.filter(bean -> !(bean instanceof NullBean));
@SuppressWarnings("unchecked")
@Override
public Stream<T> orderedStream()
String[] beanNames = getBeanNamesForTypedStream(requiredType, allowEagerInit);
if (beanNames.length == 0)
return Stream.empty();
Map<String, T> matchingBeans = CollectionUtils.newLinkedHashMap(beanNames.length);
for (String beanName : beanNames)
Object beanInstance = getBean(beanName);
if (!(beanInstance instanceof NullBean))
matchingBeans.put(beanName, (T) beanInstance);
Stream<T> stream = matchingBeans.values().stream();
return stream.sorted(adaptOrderComparator(matchingBeans));
;
org.springframework.beans.factory.support.DefaultListableBeanFactory.BeanObjectProvider#getIfAvailable会调用resolveBean,
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveBean
@Nullable
private <T> T resolveBean(ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull)
NamedBeanHolder<T> namedBean = resolveNamedBean(requiredType, args, nonUniqueAsNull);
if (namedBean != null)
return namedBean.getBeanInstance();
BeanFactory parent = getParentBeanFactory();
if (parent instanceof DefaultListableBeanFactory)
return ((DefaultListableBeanFactory) parent).resolveBean(requiredType, args, nonUniqueAsNull);
else if (parent != null)
ObjectProvider<T> parentProvider = parent.getBeanProvider(requiredType);
if (args != null)
return parentProvider.getObject(args);
else
return (nonUniqueAsNull ? parentProvider.getIfUnique() : parentProvider.getIfAvailable());
return null;
上面的resolveNamedBean方法:
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveNamedBean(org.springframework.core.ResolvableType, java.lang.Object[], boolean)
@Nullable
private <T> NamedBeanHolder<T> resolveNamedBean(
ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull) throws BeansException
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
Assert.notNull(requiredType, "Required type must not be null");
String[] candidateNames = getBeanNamesForType(requiredType);
if (candidateNames.length > 1)
List<String> autowireCandidates = new ArrayList<>(candidateNames.length);
for (String beanName : candidateNames)
if (!containsBeanDefinition(beanName) || getBeanDefinition(beanName).isAutowireCandidate())
autowireCandidates.add(beanName);
if (!autowireCandidates.isEmpty())
candidateNames = StringUtils.toStringArray(autowireCandidates);
if (candidateNames.length == 1)
return resolveNamedBean(candidateNames[0], requiredType, args);
else if (candidateNames.length > 1)
Map<String, Object> candidates = CollectionUtils.newLinkedHashMap(candidateNames.length);
for (String beanName : candidateNames)
if (containsSingleton(beanName) && args == null)
Object beanInstance = getBean(beanName);
candidates.put(beanName, (beanInstance instanceof NullBean ? null : beanInstance));
else
candidates.put(beanName, getType(beanName));
String candidateName = determinePrimaryCandidate(candidates, requiredType.toClass());
if (candidateName == null)
candidateName = determineHighestPriorityCandidate(candidates, requiredType.toClass());
if (candidateName != null)
Object beanInstance = candidates.get(candidateName);
if (beanInstance == null)
return null;
if (beanInstance instanceof Class)
return resolveNamedBean(candidateName, requiredType, args);
return new NamedBeanHolder<>(candidateName, (T) beanInstance);
if (!nonUniqueAsNull)
throw new NoUniqueBeanDefinitionException(requiredType, candidates.keySet());
return null;
org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanNamesForType(org.springframework.core.ResolvableType)
@Override
public String[] getBeanNamesForType(ResolvableType type)
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
return getBeanNamesForType(type, true, true);
@Override
public String[] getBeanNamesForType(ResolvableType type, boolean includeNonSingletons, boolean allowEagerInit)
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
Class<?> resolved = type.resolve();
if (resolved != null && !type.hasGenerics())
return getBeanNamesForType(resolved, includeNonSingletons, allowEagerInit);
else
return doGetBeanNamesForType(type, includeNonSingletons, allowEagerInit);
最终获得:
service:
org.springframework.cloud.kubernetes.client.loadbalancer.KubernetesClientServicesListSupplier
pod:
org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier
service模式下获取远程访问地址过程
待更新,此模式下默认不会走缓存
pod模式下获取远程访问地址过程
待更新,此模式下默认会走缓存
config
org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration
待更新,具体初始化过程
以上是关于spring cloud kubernetes源码解析之feign与loadbalancer的主要内容,如果未能解决你的问题,请参考以下文章
spring cloud kubernetes在pod模式下服务调用源码解析
spring cloud kubernetes在pod模式下服务调用源码解析
spring cloud kubernetes源码解析之feign与loadbalancer
spring cloud kubernetes源码解析之feign与loadbalancer