3.SpringCloud -- 服务调用负载均衡 RibbonOpenFeign

Posted 爱是与世界平行

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.SpringCloud -- 服务调用负载均衡 RibbonOpenFeign相关的知识,希望对你有一定的参考价值。

一、引入 服务调用、负载均衡

1.1 问题 与 解决

【问题:】
    在上一篇中,介绍了 Eureka、Zookeeper、Consul 作为注册中心,并使用 RestTemplate 进行服务调用。 详见:https://www.cnblogs.com/l-y-h/p/14193443.html
    那么是如何进行负载均衡的呢?
    
【解决:】
    在 @Bean 声明 RestTemplate 时,添加一个 @LoadBalanced,并使用 注册中心中 的服务名 作为 RestTemplate 的 URL 地址(ip、端口号)。
    就这么简单的两步,即可实现了负载均衡。
    
    那么这里面又涉及到什么技术知识呢?能不能更换负载均衡策略?能不能自定义负载均衡策略?
常用技术:
    Ribbon(维护状态,替代产品为 Loadbalancer)
    OpenFeign(推荐使用)
    
【说明:】
    此处以 Eureka 伪集群版创建的几个模块作为演示。代码地址:https://github.com/lyh-man/SpringCloudDemo
    服务注册中心:eureka_server_7001、eureka_server_7002、eureka_server_7003
    服务提供者:eureka_client_producer_8002、eureka_client_producer_8003、eureka_client_producer_8004
    服务消费者:eureka_client_consumer_9002 
注:
    主要还是在 服务消费者 上配置负载均衡策略(可以 Debug 模式启动看看执行流程),其他模块直接启动即可。

二、服务调用、负载均衡 – Ribbon

2.1 什么是 Ribbon?

【Ribbon:】
    Ribbon 是 Netflix 公司实现的一套基于 HTTP、TCP 的客户端负载均衡的工具。
    SpringCloud 已将其集成到 spring-cloud-netflix 中,实现 SpringCloud 的服务调用、负载均衡。
    Ribbon 提供了多种方式进行负载均衡(默认轮询),也可以自定义负载均衡方法。
    
注:
    Ribbon 虽然已进入维护模式,但是一时半会还不容易被完全淘汰,还是可以学习一下基本使用的。
    Ribbon 替代产品是 Loadbalancer。
    
【相关网址:】
    https://github.com/Netflix/ribbon
    http://jvm123.com/doc/springcloud/index.html#spring-cloud-ribbon

2.2 Ribbon 与 nginx 负载均衡区别

【负载均衡(Load Balance):】
    负载均衡指的是 将工作任务 按照某种规则 平均分摊到 多个操作单元上执行。
注:
    Web 项目的负载均衡,可以理解为:将用户请求 平均分摊到 多个服务器上处理,从而提高系统的并发度、可用性。

【负载均衡分类:】
按照软硬件划分:
    硬件负载均衡: 一般造价昂贵,但数据传输更加稳定。比如: F5 负载均衡。
    软件负载均衡: 一般采用某个代理组件,并使用 某种 负载均衡 算法实现(一种消息队列分发机制)。比如:Nginx、Ribbon。

按照负载均衡位置划分:
    集中式负载均衡:提供一个 独立的 负载均衡系统(可以是软件,比如:Nginx,可以是硬件,比如:F5)。
        通过此系统,将服务消费者的 请求 通过某种负载均衡策略 转发给 服务提供者。
        
    客户端负载均衡(进程式负载均衡):将负载均衡逻辑整合到 服务消费者中,服务消费者 定时同步获取到 服务提供者信息,并保存在本地。
        每次均从本地缓存中取得 服务提供者信息,并根据 某种负载均衡策略 将请求发给 服务提供者。
注:
    使用集中式负载均衡时,服务消费者 不知道 任何一个服务提供者的信息,只知道独立负载均衡设备的信息。
    使用客户端负载均衡时,服务消费者 知道 所有服务提供者的信息。

【Nginx 负载均衡:】
    Nginx 实现的是 集中式负载均衡,Nginx 接收 客户端所有请求,并将请求转发到不同的服务器进行处理。
    
【Ribbon 负载均衡:】
    Ribbon 实现的是 客户端负载均衡,从注册中心获得服务信息并缓存在本地,在本地进行 负载均衡。

2.3 更换 Ribbon 负载均衡规则(两种方式)

(1)引入依赖

【依赖:】
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>

一般使用 Ribbon 时需要引入上述依赖,但是对于 eureka 来说,其 eureka-client 依赖中已经集成了 ribbon 依赖,所以无需再次引入。

(2)Ribbon 提供的几种负载均衡算法
  Ribbon 提供了 IRule 接口,通过其可以设置并更换负载均衡规则。
  IRule 实质就是 根据某种负载均衡规则,从服务列表中选取一个需要访问的服务。
  一般默认使用 ZoneAvoidanceRule + RoundRobinRule。

IRule 子类如下:】
RoundRobinRule   
    轮询,按照服务列表顺序 循环选择服务。
    
RandomRule      
    随机,随机的从服务列表中选取服务。
    
RetryRule        
    重试,先按照轮询策略获取服务,若获取失败,则在指定时间进行重试,重新获取可用服务。
    
WeightedResponseTimeRule   
    加权响应时间,响应时间越低(即响应时间快),权重越高,越容易被选择。刚开始启动时,使用轮询策略。
    
BestAvailableRule          
    高可用,先过滤掉不可用服务(多次访问故障而处于断路器跳闸的服务),选择一个并发量最小的服务。

AvailabilityFilteringRule
    可用筛选,先过滤掉不可用服务 以及 并发量超过阈值的服务,对剩余服务按轮询策略访问。

ZoneAvoidanceRule
    区域回避,默认规则,综合判断服务所在区域的性能 以及 服务的可用性,过滤结果后采用轮询的方式选择结果。
IRule:】
package com.netflix.loadbalancer;

public interface IRule {
    Server choose(Object var1);

    void setLoadBalancer(ILoadBalancer var1);

    ILoadBalancer getLoadBalancer();
}

以 Dubug 模式 启动 eureka_client_consumer_9002,并在 IRule 接口 实现类的 choose() 方法上打上断点,发送请求时,将会进入断点,此时可以看到执行的 负载均衡规则。

不停的刷新页面,可以看到请求以轮询的方式被 服务提供者 处理。

(3)替换负载均衡规则(方式一:新建配置类)

【步骤一:】
    新建一个配置类(该类不能被 @ComponentScan 扫描到,即不能与 启动类 在同一个包下),并定义规则。
比如:
    package com.lyh.springcloud.customize;
    
    import com.netflix.loadbalancer.IRule;
    import com.netflix.loadbalancer.RandomRule;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class CustomizeLoadBalanceRule {
    
        @Bean
        public IRule customizeRule() {
            return new RandomRule();
        }
    }

【步骤二:】
    在启动类上添加 @RibbonClient 注解,并指定服务名 以及 规则。
比如:
    @RibbonClient(name = "EUREKA-CLIENT-PRODUCER", configuration = CustomizeLoadBalanceRule.class)

不停的刷新页面,可以看到请求以随机的方式被 服务提供者 处理,而非轮询。

(4)替换负载均衡规则(方式二:修改配置文件)
  在服务消费者 配置文件中 根据 服务提供者 服务名,
  通过 ribbon.NFLoadBalancerRuleClassName 指定负载均衡策略。
注:
  在后面的 OpenFeign 的使用中进行演示。

【举例:】
EUREKA-CLIENT-PRODUCER: # 服务提供者的服务名
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

2.4 轮询原理(RoundRobinRule)

(1)相关源码
  主要就是 choose()、incrementAndGetModulo() 这两个方法。
  choose() 用于选择 server 服务。
  incrementAndGetModulo() 用来决定 选择哪个服务,返回服务下标。

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

private AtomicInteger nextServerCyclicCounter;
private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}

(2)代码分析

【choose():】
    初始进入 choose() 方法,server 为 null 表示服务不存在,count 为 0 表示属于尝试第一次获取服务。
    进入 while 循环后,退出条件为 server 不为 null(即找到服务) 或者 count 大于等于 10 (即尝试了 10 次仍未找到服务)。
    而 获取 server 的核心在于获取 服务的下标,即 int nextServerIndex = incrementAndGetModulo(serverCount);

【incrementAndGetModulo()】
    核心就是 自旋 CAS 并取模。
    modulo 表示服务器总数,current 表示当前服务下标,next 表示下一个服务下标。
    compareAndSet() 即 CAS 实现,如果 内存中的值 与 current 相同,那么将内存中值改为 next,并返回 true,否则返回 false。
    即 compareAndSet 失败后,会不停的执行循环 以获取 最新的 current。
注:
    自旋、CAS 后面会讲到。CAS 保证原子性。
    其实就是一个公式: 第几次请求 % 服务器总数量 = 实际调用服务器下标位置

5、手写一个轮询算法

(1)说明

【说明:】
    创建一个与  eureka_client_consumer_9002 模块类似的模块 eureka_client_consumer_9005。
    修改配置文件,并去除 @LoadBalanced 注解(避免引起误解)。
    自己实现一个轮询算法(与 RoundRobinRule 类似)。
注:
    此处在 controller 中定义一个接口,用于测试 轮询的功能(仅供参考,可以继承 AbstractLoadBalancerRule,自行构造一个负载均衡类)。
    去除 @LoadBalanced 注解后,访问调用 RestTemplate 请求的接口时会报错(用于区分)。

(2)相关代码

模块创建此处省略(需要修改 pom.xml,配置文件)。
  详情请见上篇博客:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3

面向接口编程,此处新建一个 LoadBalacner 接口,用于定义抽象方法(返回服务信息)。
  并定义一个 LoadBalacner 接口的实现类 LoadBalancerImpl。
  在 controller 中编写接口(服务发现),测试一下。

LoadBalacnerpackage com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance;

import org.springframework.cloud.client.ServiceInstance;

import java.util.List;

public interface LoadBalacner {
    /**
     * 从服务实例列表中获取出 服务实例
     */
    ServiceInstance getInstances(List<ServiceInstance> serviceInstances);
}LoadBalancerImplpackage com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.impl;

import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class LoadBalancerImpl implements LoadBalacner {
    private AtomicInteger atomicInteger = new AtomicInteger();

    @Override
    public ServiceInstance getInstances(List<ServiceInstance> serviceInstances) {
        if (serviceInstances == null || serviceInstances.size() == 0) {
            return null;
        }
        return serviceInstances.get(incrementAndGetModulo(serviceInstances.size()));
    }

    public int incrementAndGetModulo(int count) {
        int current = 0;
        int next = 0;
        do {
            current = atomicInteger.get();
            next = (current >= Integer.MAX_VALUE ? 0 : current + 1) % count;
        } while (!atomicInteger.compareAndSet(current, next));
        return next;
    }
}ConsumerControllerpackage com.lyh.springcloud.eureka_client_consumer_9005.controller;

import com.lyh.springcloud.common.tools.Result;
import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
import com.lyh.springcloud.eureka_client_consumer_9005.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

@RestController
@RequestMapping("/consumer/user")
public class ConsumerController {

    // 注意,此处 url 写死的,仅用于演示,实际项目中不能这么干。
//    public static final String PRODUCER_URL = "http://localhost:8001/producer/";
    // 通过服务名 找到  Eureka 注册中心真实访问的 地址
    public static final String PRODUCER_URL = "http://EUREKA-CLIENT-PRODUCER";

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private LoadBalacner loadBalancer;

    @GetMapping("/loadBalance")
    public Result testLoadBalance() {
        ServiceInstance serviceInstance = loadBalancer.getInstances(discoveryClient.getInstances("EUREKA-CLIENT-PRODUCER"));
        if (serviceInstance == null) {
            return Result.error().message("服务不存在");
        }
        return Result.ok().data("HostAndPort", serviceInstance.getHost() + ":" + serviceInstance.getPort());
    }

    @GetMapping("/get/{id}")
    public Result getUser(@PathVariable Integer id) {
        return restTemplate.getForObject(PRODUCER_URL + "/producer/user/get/" + id, Result.class);
    }

    @PostMapping("/create")
    public Result createUser(@RequestBody User user) {
        return restTemplate.postForObject(PRODUCER_URL + "/producer/user/create", user, Result.class);
    }
}

三、补充知识

3.1 CAS

(1)什么是 CAS?

【CAS:】
    CAS 是 Compare And Swap 的缩写,即 比较交换。
    是一种无锁算法,在不加锁的情况下实现多线程之间变量同步,从而保证数据的原子性。
    属于硬件层面对并发操作的支持(CPU 原语)。
注:
    原子性:一个操作或多个操作要么全部执行且执行过程中不会被其他因素打断,要么全部不执行。
    原语:指的是若干条指令组成的程序段,实现特定的功能,执行过程中不能被中断(也即原子性)。
    
【基本流程:】
    CAS 操作包含三个操作数 —— 内存值(V)、预期原值(A)和新值(B)。
    如果内存里面的值 V 和 A 的值是一样的,那么就将内存里面的值更新成 B,
    若 V 与 A 不一致,则不操作(某些情况下,可以通过自旋操作,不断尝试修改数据直至成功修改)。
即
    V = V == A ? B : V;
或者
    for(;;) {
        V = getV();
        if (V == A) {
            V = B;
            break;
        }
    }

【缺点:(详见下面的 Atomic 类底层原理)】
    会出现 ABA 问题(两次读取数据时值相同,但不确定值是否被修改过)。
    使用自旋(死循环)CAS 时会占用系统资源、影响执行效率。
    每次只能对一个共享变量进行原子操作。

(2)原子性

【说明:】
    初始 i = 0,现有 10 个线程,分别执行 i++ 10000次,若不对 i++ 做任何限制,那么最终执行结果一般都是小于 100000 的。
    因为 A、B 执行 i++ 操作时,彼此会相互干扰,也即不能保证原子性。

【如何保证原子性:】
    可以给 i++ 操作加上 synchronized 进行同步控制,从而保证操作按照顺序执行、互不干扰。
    也可以使用 Atomic 相关类进行操作(核心是 自旋 CAS 操作 volatile 变量)。
注:
    synchronized 在 JDK1.6 之前,属于重量级锁,属于悲观锁的一种(在操作锁变量前就给对象加锁,而不管对象是否发生资源竞争),性能较差。
    在 JDK1.6 之后,对 synchronized 进行了优化,引入了 偏向锁、轻量级锁、采用 CAS 思想,提升了效率。
    
【CAS 与 synchronized 比较:】
CAS:
    CAS 属于无锁算法,可以支持多个线程并发修改,并发度高。
    CAS 每次只支持一个共享变量进行原子操作。
    CAS 会出现 ABA 问题。

synchronized:
    synchronized 一次只能允许一个线程修改,并发度低。
    synchronized 可以对多个共享变量进行原子操作。

【举例:】

package com.lyh.tree;

import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    private int count = 0;

    private int count2 = 0;

    private AtomicInteger count3 = new AtomicInteger(0);

    /**
     * 普通方法
     */
    public void increment() {
        count++;
    }

    /**
     * 使用 synchronized 修饰的方法
     */
    public synchronized void increment2() {
        count2++;
    }

    /**
     * 使用 atomic 类的方法
     */
    public void increment3() {
        count3.getAndIncrement();
    }

    public static void main(String[] args) {
        // 实例化一个对象
        Test test = new Test();

        // 创建 10 个线程
        for (int i = 0; i < 10; i++) {
            // 每个线程内部均 执行 10000 次 三种 i++ 操作
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 普通方法 i++
                    test.increment();
                    // 添加 synchronized 关键字后的 i++
                    test.increment2();
                    // 使用 atomic 类的 i++
                    test.increment3();
                }
            }, "thread-" + "i").start();
        }

        // 等待 1 秒,确保上面线程可以执行完毕
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 输出三种 i++ 的最终结果
        System.out.println("普通方法 i++ 操作后最终值 i = " + test.count);
        System.out.println("添加 synchronized 关键字后的 i++ 操作后最终值 i = " + test.count2);
        System.out.println("使用 atomic 类的 i++ 操作后最终值 i = " + test.count3);
    }
}

3.2 Atomic 类底层原理

(1)Atomic 常用类有哪些?

以上是关于3.SpringCloud -- 服务调用负载均衡 RibbonOpenFeign的主要内容,如果未能解决你的问题,请参考以下文章

Ribbon负载均衡服务调用

Spring Cloud - 远程调用和负载均衡

Dubbo服务调用的动态代理和负载均衡

Ribbon负载均衡及Feign消费者调用服务

Ribbon负载均衡及Feign消费者调用服务

Ribbon负载均衡服务调用