spring cloud ribbon

Posted yanhui007

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud ribbon相关的知识,希望对你有一定的参考价值。


负载策略

RoundRobinRule,轮训策略,默认策略
RandomRule,随机,使用Random对象从服务列表中随机选择一个服务
RetryRule,轮询 + 重试
WeightedResponseTimeRule:优先选择响应时间快,此策略会根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。此类有个DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换回来
AvailabilityFilteringRule:可用性过滤,会先过滤掉以下服务:由于多次访问故障而断路器处于打开的服务、并发的连接数量超过阈值,然后对剩余的服务列表按照RoundRobinRule策略进行访问
BestAvailableRule:优先选择并发请求最小的,刚启动时吗,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,才会切换回来
ZoneAvoidanceRule:可以实现避免可能访问失效的区域(zone)

@LoadBalanced开启了负载功能

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
    return new RestTemplate();
}
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

在RestTemplate上面增加注解@LoadBalanced,就开启了负载功能。
简单介绍原理:该注解会把所有的的RestTemplate实例都加上@LoadBalanced,后续通过@Qualifier和@Autowire注解可以将所有的RestTemplate都注入获取到。
然后挨个给这些RestTemplate添加拦截器,在拦截器中实现负载逻辑。

RibbonAutoConfiguration自动配置类

@Configuration
@ConditionalOnClass({IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(
        name = {"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration"}
)
//引入LoadBalancerAutoConfiguration类
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class})
public class RibbonAutoConfiguration {
    @Autowired(
            required = false
    )
    private List<RibbonClientSpecification> configurations = new ArrayList();

    //饥饿加载模式配置
    //正常模式下ribbon client都是到使用的时候才去加载
    //饥饿模式在spring容器初始化完毕就加载ribbon client
    @Autowired
    private RibbonEagerLoadProperties ribbonEagerLoadProperties;

    //子容器工厂,一个项目可以配置多个ribbon client,这些client分别加载自己的容器,互相独立
    @Bean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }

    @Bean//引入RibbonLoadBalancerClient
    @ConditionalOnMissingBean({LoadBalancerClient.class})
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(this.springClientFactory());
    }


    @Bean
    @ConditionalOnProperty(
            value = {"ribbon.eager-load.enabled"},//该属性开启饥饿模式
            matchIfMissing = false
    )
    //饥饿加载模式
    public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
        return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());
    }

}

该类引入哪些功能:
1、LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器
2、

LoadBalancerAutoConfiguration:给所有RestTemplate添加拦截器

 @Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
    //这里配合@LoadBalanced注解的@Qualifier注解,将所有包含@LoadBalanced注解的RestTemplate给注入进来
    @LoadBalanced
    @Autowired(
            required = false
    )
    private List<RestTemplate> restTemplates = Collections.emptyList();
    @Autowired(
            required = false
    )
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

    public LoadBalancerAutoConfiguration() {
    }

    /***
     * 此方法的逻辑
     * 获取到所有的RestTemplate,把这些RestTemplate加入到自定义定制器中
     * @param customizers
     * @return
     */
    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {
        return new SmartInitializingSingleton() {
            public void afterSingletonsInstantiated() {
                Iterator var1 = org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.this.restTemplates.iterator();

                while(var1.hasNext()) {
                    RestTemplate restTemplate = (RestTemplate)var1.next();
                    Iterator var3 = customizers.iterator();

                    while(var3.hasNext()) {
                        RestTemplateCustomizer customizer = (RestTemplateCustomizer)var3.next();
                        customizer.customize(restTemplate);
                    }
                }

            }
        };
    }

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }


    @Configuration
    @ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
    static class LoadBalancerInterceptorConfig {
        LoadBalancerInterceptorConfig() {
        }

        //初始化拦截器
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        //初始化自定义定制器:该定制器会将放入到restTemplate都加上拦截器
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }
}

获取所有restTempalte,遍历挨个添加拦截器LoadBalancerInterceptor。

看一下拦截器LoadBalancerInterceptor是啥样的。

//LoadBalancerInterceptor
    private LoadBalancerClient loadBalancer;

    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        //入口
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
//RibbonLoadBalancerClient
     public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        //获取负载均衡器
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        //通过负载均衡器选主一个server
        Server server = this.getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
            return this.execute(serviceId, ribbonServer, request);
        }
    }

SpringClientFactory子容器

在自动配置类RibbonAutoConfiguration中有SpringClientFactory的加载,那么这个类是做什么的呢?
一个项目中可以配置多个ribbon client,这些client的配置等信息是不一样的,那么这些client加载到容器中是怎么处理的呢?
通过子容器,每个ribbon client有自己的容器,互相隔离,通过serviceId获取子容器中的IClient、ILoadBalancer、IClientConfig、RibbonLoadBalancerContext、AnnotationConfigApplicationContext。


public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
	static final String NAMESPACE = "ribbon";
	public SpringClientFactory() {
		//RibbonClientConfiguration后面会介绍
		super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
	}
	/**
	 * 根据serviceId获取rest client
	 * FeignLoadBalancer就是它的实现类,还有RibbonLoadBalancingHttpClient(HttpClient)、OkHttpLoadBalancingClient(OkHttp)
	 */
	public <C extends IClient<?, ?>> C getClient(String name, Class<C> clientClass) {
		return getInstance(name, clientClass);
	}
 
	/**
	 * 根据serviceId获取ILoadBalancer(上面讲过)
	 */
	public ILoadBalancer getLoadBalancer(String name) {
		return getInstance(name, ILoadBalancer.class);
	}
 
	/**
	 * 根据serviceId获取IClientConfig(上面讲过)
	 */
	public IClientConfig getClientConfig(String name) {
		return getInstance(name, IClientConfig.class);
	}
 
	/**
	 * 根据serviceId获取RibbonLoadBalancerContext
	 * FeignLoadBalancer有继承RibbonLoadBalancerContext
	 */
	public RibbonLoadBalancerContext getLoadBalancerContext(String serviceId) {
		return getInstance(serviceId, RibbonLoadBalancerContext.class);
	}
	static <C> C instantiateWithConfig(Class<C> clazz, IClientConfig config) {
		return instantiateWithConfig(null, clazz, config);
	}
	static <C> C instantiateWithConfig(AnnotationConfigApplicationContext context,
										Class<C> clazz, IClientConfig config) {
		C result = null;
		
		try {
			Constructor<C> constructor = clazz.getConstructor(IClientConfig.class);
			result = constructor.newInstance(config);
		} catch (Throwable e) {
			// Ignored
		}		
		if (result == null) {
			result = BeanUtils.instantiate(clazz);
			
			if (result instanceof IClientConfigAware) {
				((IClientConfigAware) result).initWithNiwsConfig(config);
			}
			
			if (context != null) {
				context.getAutowireCapableBeanFactory().autowireBean(result);
			}
		}
		
		return result;
	}
	@Override
	public <C> C getInstance(String name, Class<C> type) {
		C instance = super.getInstance(name, type);
		if (instance != null) {
			return instance;
		}
		IClientConfig config = getInstance(name, IClientConfig.class);
		return instantiateWithConfig(getContext(name), type, config);
	}
	//根据serviceId获取Ioc容器(AnnotationConfigApplicationContext)
	@Override
	protected AnnotationConfigApplicationContext getContext(String name) {
		return super.getContext(name);
	}
}
//与上面的是继承关系
public abstract class NamedContextFactory<C extends NamedContextFactory.Specification> implements DisposableBean, ApplicationContextAware {
    //所有的子容器映射关系
    private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap();
    //获取一个子容器
    protected AnnotationConfigApplicationContext getContext(String name) {
        if (!this.contexts.containsKey(name)) {
            Map var2 = this.contexts;
            synchronized(this.contexts) {
                if (!this.contexts.containsKey(name)) {
                    //创建子容器
                    this.contexts.put(name, this.createContext(name));
                }
            }
        }
        return (AnnotationConfigApplicationContext)this.contexts.get(name);
    }
    //创建一个子容器
    protected AnnotationConfigApplicationContext createContext(String name) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        if (this.configurations.containsKey(name)) {
            //注入@RibbonClient的configuration
            Class[] var3 = ((NamedContextFactory.Specification)this.configurations.get(name)).getConfiguration();
            int var4 = var3.length;
 
            for(int var5 = 0; var5 < var4; ++var5) {
                Class<?> configuration = var3[var5];
                context.register(new Class[]{configuration});
            }
        }
        Iterator var9 = this.configurations.entrySet().iterator();
        while(true) {
            Entry entry;
            do {
                if (!var9.hasNext()) {
                    //注入RibbonClientConfiguration与配置
                    context.register(new Class[]{PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType});
                    context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(this.propertySourceName, Collections.singletonMap(this.propertyName, name)));
                    if (this.parent != null) {
                        context.setParent(this.parent);
                    }
 
                    context.refresh();
                    return context;
                }
                entry = (Entry)var9.next();
            } while(!((String)entry.getKey()).startsWith("default."));
 
            Class[] var11 = ((NamedContextFactory.Specification)entry.getValue()).getConfiguration();
            int var12 = var11.length;
            for(int var7 = 0; var7 < var12; ++var7) {
                Class<?> configuration = var11[var7];
                context.register(new Class[]{configuration});
            }
        }
    }

}

RibbonClientConfiguration也很重要,每个子容器中都会注入RibbonClientConfiguration.
RibbonClientConfiguration中指出了ILoadBalancer的默认实现类为ZoneAwareLoadBalancer

返回顶部

hystrix

使用命令模式将所有对外部服务的调用包装在HystrixCommand或HystrixObservableCommand对象中,并将该对象放在单独的线程中执行。
命令模式
HystrixCommand:依赖的服务返回单个操作结果。封装了两种执行方法:execute()同步和queue()异步。
HystrixObservableCommand:依赖的服务返回多个操作结果。也封装了两种执行方法:observe()和toObserve()。observe()返回的是Hot Observe对象。toObserve()返回的是cold observe对象。
什么是Hot Observe?什么是cold observe?
Observable是RxJava中的概念,RxJava可以理解成发布订阅,其中Observable对象是发布者,每个Observable对应一个或多个订阅者。结合命令模式可以这么理解:RxJava在其中实现了一个Invoker的角色,通过发布订阅,将commond和reciever(命令执行者)连接起来。
Hot Observe:发布消息前无论当前Observable对象是否由订阅者,都进行消息发布。
cold observe:发布消息前如果当前Observable对象没有订阅者,则等待订阅者。

有两种使用方式,一种是通过注解,一种是通过自定义类实现HystrixCommand/HystrixObservableCommand。
首先在启动类通过注解@EnableHystrix开启断路器。

注解方式

@Component
public class UserService {

    @Autowired
    private RestTemplate restTemplate;

    /***
     * 通过注解方式
     * @return
     */
    @HystrixCommand(fallbackMethod = "fallback")//指定失败回调方法
    public User queryUserById() {
        return new RestTemplate().getForObject("http://localhost:8081/queryUserById/{1}", User.class,1L);
    }

    private String fallback() {
        return "fallback";
    }
}

自定义类

HystrixCommand:

/***
     * 通过自定义类 -- HystrixCommand
     * @return
     */
    public User queryUserByIdHystrix() throws Exception{
        //同步
        // User user  = new UserCommond(1L,restTemplate).execute();

        //异步
        Future<User> userFuture = new UserCommond(1L,restTemplate).queue();
        return userFuture.get();
    }

HystrixObservableCommand:一下发送多条请求


public class UserObserveCommond extends HystrixObservableCommand<User> {

    private RestTemplate restTemplate;
    private Long[] ids;

    public UserObserveCommond(Long[] ids , RestTemplate restTemplate) {
        super(HystrixCommandGroupKey.Factory.asKey("usercommand"));// 调用父类构造方法
        this.ids = ids;
        this.restTemplate = restTemplate;

    }

    @Override
    protected Observable<User> construct() {
        return Observable.create(new Observable.OnSubscribe<User>() {
            /*
             * Observable有三个关键的事件方法,分别为onNext,onCompleted,onError
             */
            @Override
            public void call(Subscriber<? super User> observer) {
                try {// 写业务逻辑,注意try-catch
                    if (!observer.isUnsubscribed()) {
                        for (Long id : ids) {
                            User user = restTemplate.getForObject("http://localhost:8081/queryUserById/{1}", User.class,id);
                            observer.onNext(user);
                        }
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
    /**
     * fallback方法的写法,覆写resumeWithFallback方法
     * 当调用出现异常时,会调用该降级方法
     */
    @Override
    public Observable<User> resumeWithFallback() {
        return Observable.create(new Observable.OnSubscribe<User>() {
            @Override
            public void call(Subscriber<? super User> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        User u = new User();
                        u.setName("刘先生");
                        u.setId(1l);
                        observer.onNext(u);
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
}
/***
     * 通过自定义类 -- HystrixObservableCommand
     *
     * 一下发送多条请求
     *
     * @return
     */
    public List<User> queryUserByIdHystrixObservable() throws Exception{
        List<User> list = new ArrayList<User>();
        Long[] ids = {1L,2L,3L};
        UserObserveCommond observableCommand = new UserObserveCommond(ids,restTemplate);
        Observable<User> observe = observableCommand.observe();

        observe.subscribe(new Observer<User>() {
            @Override
            public void onCompleted() {
                System.out.println("聚合完了所有的查询请求!");
                System.out.println(list);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onNext(User user) {
                list.add(user);
            }
        });
        return list;
    }

开启缓存:重写如下方法即可

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

源码

使用命令模式,每一个远程调用都会封装成一个commond对象,然后在通过切面,对该commond对象进行增强。
切面类是HystrixCommandAspect:


返回顶部

以上是关于spring cloud ribbon的主要内容,如果未能解决你的问题,请参考以下文章

0404-Ribbon通过代码自定义配置使用配置文件自定义Ribbon Client

笔记:Spring Cloud Feign Ribbon 配置

spring cloud 整合ribbon问题

Spring Cloud Ribbon 整合 Hystrix

Spring Cloud Alibaba - 10 Ribbon 自定义负载均衡策略(权重算法)

解决Spring Cloud中Feign/Ribbon第一次请求失败的方法