springcloud 微服务Spring Cloud Alibaba Sentinel使用详解
Posted 逆风飞翔的小叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud 微服务Spring Cloud Alibaba Sentinel使用详解相关的知识,希望对你有一定的参考价值。
目录
一、前言
对于一个线上运行的微服务架构系统来说,稳定性是非常重要的,稳定性体现在多个方面,其中衡量服务稳定性一个重要指标就是能够抗住外部瞬时高并发的流量带来的冲击,或者说恶意的流量对系统造成的危害,在SpringCloud微服务生态下,早期一个重要的组件hystrix就是用来做服务的熔断,降级,限流等,而在SpringCloud Alibaba中,出现了Sentinel这一升级版的专门用于分布式系统流量治理的组件,本篇就来详细介绍下Sentinel的使用。
二、分布式系统遇到的问题
当系统从单体到微服务拆分后,随着服务规模的增长,必然要面对的就是分布式系统带来的新的问题,下面具体聊聊这些问题。
2.1 服务可用性问题
衡量一个系统好坏最直接的标准就是这个系统是否可以做到任何情况下都可以对外提供服务,即大家口中的多少个9的问题,比如下面这张图,形象的说明了服务可用性的一些问题,造成服务不可用的因素有哪些呢?这里总结下面主要几点。
2.1.1 单点故障
时至今日,仍然有不少小的项目为节省服务部署带来的开销而单点部署,可以说这种服务是非常脆弱的,一旦某种意外发生,服务很容易挂掉。
2.1.2 流量飙升
对于某些系统来说,如果服务端的程序本身的健壮性不够,比如说对抗高并发考虑不足,一旦瞬时高并发的流量涌入,很容易将服务搞挂,要知道,搞垮你服务的可能不是真有那么多的用户在访问,而是一些恶意的请求或者来自竞争对手的小心思。
2.1.3 容错机制
可以这么说,很多系统在规模比较小的时候,不太会去思考如何对服务端的某些程序可以做容错处理的,比如说,当你的接口请求线程被打满的时候你会想过要做排队等待吗?或者说你调用的其他依赖服务超时了你会主动断开重试吗?更常用的,你能预估哪些接口是核心关键的服务而对他们做一些限流的逻辑吗?没有问题还好,一旦出了问题,往往都是很严重的问题,说到底,服务容错的意识是很重要的。
2.2 服务雪崩问题
以下图为例进行说明,这是一个典型的分布式应用架构,系统中有多个微服务应用,各个微服务之间存在互相的调用,甚至是依赖的调用,一个典型的场景是:
A依赖B,B依赖C,C依赖D...,如果某一天D服务因为某种原因不可用,请求从A进入,由于D挂了,导致前面的依赖请求都卡在那儿等待,后续进入A的请求越来越多,总有某个时刻从A开始,一直到D,大家的线程全被耗尽,这就是大家熟知的服务雪崩效应。服务雪崩的危害是很大的,一旦出现雪崩,将造成整个系统不可用。
三、 服务可用性解决方案
针对上面的对服务可用性带来的问题,经过多年的生产实践经验,业内也逐渐积累了一些通识的处理方案。
3.1 服务容错机制
对于系统中关键的服务,需要采用服务容错机制,确保关键服务的可用性,常见的服务容错机制有
3.1.1 超时机制
在不做任何处理的情况下,服务提供者不可用会导致消费者请求线程强制等待,而造成系统资源耗尽。加入超时机制,一旦超时,就释放资源。由于释放资源速度较快,一定程度上可以抑制资源耗尽的问题。
3.1.2 服务限流
前面提到,为了确保系统中某些关键的服务接口可用,避免被恶意的请求或者瞬时的流量打垮,即确保这些服务接口的可用性,可以考虑对这些接口进行限流,限流是一种简单快捷又有效的处理方式,能够起到立竿见影的效果。
3.1.3 隔离
隔离的原理
用户的请求将不再直接访问服务,而是通过线程池中的空闲线程来访问服务,如果线程池已满,则会进行降级 处理,用户的请求不会被阻塞,至少可以看到一个执行结果(例如返回友好的提示信息),而不是无休止的等待或者看到系统崩溃。
常用的隔离方式:
- 线程池隔离
- 信号量隔离
3.2 服务熔断
3.2.1 什么是服务熔断
远程服务不稳定或网络抖动时暂时关闭,就叫服务熔断。
现实世界的断路器大家肯定都很了解,断路器实时监控电路的情况,如果发现电路电流异常,就会跳闸,从而防止电路 被烧毁。
3.2.2 服务熔断思考
微服务下的断路可以如下理解
- 实时监测应用,如果发现在一定时间内失败次数/失败率达到一定阈值,就“跳闸”,断路器打开——此时,请求直接返回,而不去调用原本调用的逻辑;
- 跳闸一段时间后(例如10秒),断路器会进入半开状态,这是一个瞬间态,此时允许一次请求调用该调的逻辑,如果成功,则断路器关闭,应用正常调用;
- 如果调用依然不成功,断路器继续回到打开状态,过段时间再进入半开状态尝试——通过”跳闸“,应用可以保护自己,而且避免浪费资源;
- 通过这种半开的设计思想,可实现应用的“自我修复“;
所以,同样的道理,当依赖的服务有大量超时时,在让新的请求去访问根本没有意义,只会无畏的消耗现有资源。比如我们设置了超时时间为1s,如果短时间内有大量请求在1s内都得不到响应,就意味着这个服务出现了异常,此时就没有必要再让其他的请求去访问这个依赖了,这个时候就应该使用断路器避免资源浪费。
3.3 服务降级
有服务熔断,必然要有服务降级
所谓降级,就是当某个服务熔断之后,服务将不再被调用,此时客户端可以自己准备一个本地的fallback(回退)回调,返回一个缺省值。 例如:(备用接口/缓存/mock数据) 。这样做,虽然服务水平下降,但好歹可用,比直接挂掉要强,当然这也要看适合的业务场景。
四、 Sentinel介绍
4.1 Sentinel是什么
Sentinel 号称分布式系统的流量防卫兵,随着微服务的流行,服务和服务之间的稳定性变得越来越重要。源码地址:Sentinel源码地址 ,官方文档:Sentinel官方文档地址
Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。
4.2 Sentinel主要特征
4.2.1 丰富的应用场景
Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、实时熔断下游不可用应用等。
4.2.2 完备的实时监控
Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至500台以下规模的集群的汇总运行情况。
4.2.3 广泛的开源生态
4.2.4 完善的 SPI 扩展点Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
Sentinel 提供简单易用、完善的 SPI 扩展点。您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理、适配数据源等。
4.3 Sentinel与hystrix对比
说起Sentinel,很难不提hystrix,毕竟在Sentinel推出之前,springcloud体系下的hystrix在服务的熔断,降级等方面还是比较出色的,下面列举几点它们之间的差异对比。hystrix git地址:hystrix地址,从两者多维度的指标对比来看,想必大家也能明白为什么springcloud aliababa 最后选择了功能更加完备的Sentinel。
五、 Sentinel 各类流控规则
通过上面的理论介绍,对Sentinel有了一个较全面的认识,更深入的可以参考官网的文档,接下来将从代码层面对Sentinel的使用做详细的说明。
Sentinel 可以简单的分为 Sentinel 核心库和 Dashboard。核心库不依赖 Dashboard,但是结合 Dashboard 可以取得最好的效果。也就是说,Sentinel 提供了基本的SDK,最直接的,可以在你的工程中引入SDK,就能快速使用Sentinel 提供的各种规则了。
5.1 规则种类
为了满足开发中可能遇到的各种流控场景,Sentinel 提供了多种流控规则,只要引入其SDK,即可开箱即用,官方参考说明地址:规则说明地址
Sentinel 的所有规则都可以在内存态中动态地查询及修改,修改之后立即生效。同时 Sentinel 也提供相关 API,供您来定制自己的规则策略。
5.1.1 Sentinel 支持的规则
Sentinel 支持以下几种规则:
- 流量控制规则;
- 熔断降级规则;
- 系统保护规则;
- 来源访问控制规则;
- 热点参数规则
5.2 Sentinel 流控规则
在Sentinel 提供的各类规则中,日常开发中,最常使用的就是流控规则了,关于流控规则相关的知识点,官方文档给出了下面的定义;
注意:同一个资源可以同时有多个限流规则,检查规则时会依次检查
5.2.1 代码示例
理解上面规则定义后,可以通过调用 方法来用硬编码的方式定义流量控制规则
private void initFlowQpsRule()
List<FlowRule> rules = new ArrayList<>();
//resourceName:被保护的资源名称
FlowRule rule = new FlowRule(resourceName);
// set limit qps to 20 阈值
rule.setCount(20);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
rules.add(rule);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
5.3 Sentinel 熔断降级规则
当外部的请求或流量达到程序设置的阈值之后,可以直接对服务进行熔断或降级处理,Sentinel提供了完备的熔断降级规则,包含下面几个重要的属性
注意:同一个资源可以同时有多个降级规则
5.3.1 代码示例
理解上面规则定义后,可以通过调用 方法来用硬编码的方式定义流量控制规则
private void initDegradeRule()
List<DegradeRule> rules = new ArrayList<>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// set threshold RT, 10 ms
rule.setCount(10);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
5.4 Sentinel 系统保护规则
Sentinel 系统自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统规则包含下面几个重要的属性:
注意:系统规则只针对入口资源(EntryType=IN)生效
5.4.1 代码示例
理解上面规则定义后,可以通过调用 方法来用硬编码的方式定义流量控制规则
private void initSystemRule()
List<SystemRule> rules = new ArrayList<>();
SystemRule rule = new SystemRule();
rule.setHighestSystemLoad(10);
rules.add(rule);
SystemRuleManager.loadRules(rules);
5.5 访问控制规则
很多时候,我们需要根据调用方来限制资源是否通过,这时候可以使用 Sentinel 的访问控制(黑白名单)的功能。黑白名单根据资源的请求来源()限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过;若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。
授权权规则(即黑白名单规则),主要有以下配置项:
- resource:资源名,即规则的作用对象;
- imitApp:对应的黑名单/白名单,不同 origin 用 分隔,如,appA,appB;
- strategy:限制模式, 为白名单模式, 为黑名单模式,默认为白名单模式AUTHORITY_WHITEAUTHORITY_BLACK;
5.6 定制化规则
面的规则配置,都是存在内存中的。即如果应用重启,这个规则就会失效。因此我们提供了开放的接口,您可以通过实现 DataSource 接口的方式,来自定义规则的存储数据源。通常我们的建议有:
- 整合动态配置系统,如 ZooKeeper、Nacos、Apollo 等,动态地实时刷新配置规则;
- 结合 RDBMS、NoSQL、VCS 等来实现该规则;
- 配合 Sentinel Dashboard 使用;
六、Sentinel 流控规则使用
以上总结了Sentinel 中常用的各类规则,其中在日常开发中,比较容易接触且使用较多的就是流控规则了,这一块也值得深入探究,因为理解了流控规则的原理,对后面使用dashboard的配置也就更深刻了,下面在代码中具体演示如何使用。
6.1 前置准备
导入如下的依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
6.2 QPS限流规则使用
使用Sentinel 进行编码的限流,只需要两步,第一步,编写流控规则并加载到环境,第二在具体的接口中声明式调用SDK中的限流方法,如下是完整的代码(结合注释以及上文的理论部分进行理解);
6.2.1 流控规则
其中使用了PostConstruct这个注解将规则在工程启动时全局加载;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
@RestController
@Slf4j
public class SentinelController
private static final String RESOURCE_NAME = "demo";
private static final String USER_RESOURCE_NAME = "user";
private static final String DEGRADE_RESOURCE_NAME = "degrade";
@GetMapping("/demo")
public String demoRule()
Entry entry = null;
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try
entry = SphU.entry(RESOURCE_NAME);
// 被保护的业务逻辑
String str = "hello sentinel";
return str;
catch (BlockException ex)
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
log.error("blocked !");
return "被限流了";
catch (Exception e)
Tracer.traceEntry(e, entry);
finally
if (entry != null)
entry.exit();
return null;
@PostConstruct
private static void initFlowRule()
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
//设置受保护的资源
rule.setResource(RESOURCE_NAME);
//设置流控规则 QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule.setCount(1);
rules.add(rule);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
6.2.2 功能测试
启动工程后,调用接口:localhost:8031/demo,由于流控规则中设置的是QPS每秒1个,可以正常响应;
如果快速刷接口,将看到下面的效果,即被限流了
小结
1、业务侵入性很强,需要在controller中写入非业务代码;
2、配置不灵活 若需要添加新的受保护资源 需要手动添加 init方法来添加流控规则;
6.3 SentinelResource 注解使用
鉴于上面的编码方式带来的使用上的不够遍历,Sentinel还提供了基于注解的方式,下面来演示下如何使用注解完成限流。
6.3.1 引入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.0</version>
</dependency>
6.3.2 使用SentinelResource对接口改造
@SentinelResource使用起来非常简单,只需要在需要限流的接口方法上面添加即可,里面补充必要的参数信息,SentinelResource注解官方使用说明文档:注解支持
主要使用步骤包括下面几点:
- 接口添加注解;
- 定义限流的资源名称;
- 指定限流后的方法(相当于容错);
完整代码
/**
* value :指定被限流的资源名称
* blockHandler:限流异常后的方法,也可以指定一个Class,在Class中指定异常方法
* @param userId
* @return
*/
@GetMapping("/user")
@SentinelResource(value = USER_RESOURCE_NAME,blockHandler = "blockHandlerForGetUser")
public User getUser(String userId)
User user = new User();
user.setUserId("0001");
user.setName("jerry");
return user;
public User blockHandlerForGetUser(String id,BlockException e)
e.printStackTrace();
User user = new User();
user.setUserId("mock");
user.setName("mock_user");
return user;
@PostConstruct
private static void initFlowRule()
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
//设置受保护的资源
rule.setResource(RESOURCE_NAME);
//设置流控规则 QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule.setCount(1);
rules.add(rule);
//将对应的需要限流的资源加入到限流规则中
FlowRule rule2 = new FlowRule();
//设置受保护的资源
rule2.setResource(USER_RESOURCE_NAME);
//设置流控规则 QPS
rule2.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule2.setCount(1);
rules.add(rule2);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
再次测试,快速刷接口,仍然能够看到被限流的效果
6.3.3 异常处理
在某些情况下,如果接口出现了异常,为了给到界面返回一个降级处理的结果,可以在注解中配置fallback的属性,这个和blockHandler用法很像,只需要自定义一个方法并配置到fallback属性值即可,仍然以上面的接口为例,下面是完整的代码;
@GetMapping("/user")
@SentinelResource(value = USER_RESOURCE_NAME,blockHandler = "blockHandlerForGetUser",fallback = "fallbackHandlerForGetUser")
public User getUser(String userId)
int a = 1/0;
User user = new User();
user.setUserId("0001");
user.setName("jerry");
return user;
public User fallbackHandlerForGetUser(String userId,Throwable e)
e.printStackTrace();
return new User("exception","exception_user");
再次调用接口,可以看到下面的效果,即异常后的返回结果;
七、Sentinel 降级规则使用
除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。一个服务常常会调用别的模块,可能是另外的一个远程服务、数据库,或者第三方 API 等。
Sentinel 熔断降级会在调用链路中某个资源出现不稳定状态时(例如:调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其他的资源而导致级联错误。
上图是一个典型的微服务之间调用链路的场景,可以说现代微服务架构都是分布式的,由非常多的服务组成。不同服务之间相互调用,组成复杂的调用链路。链路调用中任何一个环节出现问题将会产生放大的效果。复杂链路上的某一环不稳定,就可能会层层级联,最终导致整个链路都不可用。因此我们需要对不稳定的弱依赖服务调用进行熔断降级,暂时切断不稳定调用,避免局部不稳定因素导致整体的雪崩。熔断降级作为保护自身的手段,通常在客户端(调用端)进行配置。
降级规则相关属性
7.1 Sentinel 降级规则使用
关于规则中的各个参数的含义,可以结合上图进行理解,和上面的流控使用大同小异;
7.1.1 设置降级规则
/**
* 降级规则设置
*/
@PostConstruct
private static void initDegradeRule()
List<DegradeRule> rules = new ArrayList<>();
DegradeRule rule = new DegradeRule();
//设置受保护的资源
rule.setResource(DEGRADE_RESOURCE_NAME);
//设置规则:异常率
rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);
//触发熔断异常数:2
rule.setCount(2);
//触发熔断最小请求数 【2次请求中2次异常时将会触发规则】
rule.setMinRequestAmount(2);
//统计时长,在多长时间的窗口内进行规则的统计
rule.setStatIntervalMs(60*1000*60);
//熔断后持续的时长,这个时间内的请求都会被降级处理
rule.setTimeWindow(10);
rules.add(rule);
//加载配置好的规则
DegradeRuleManager.loadRules(rules);
7.1.2 定义接口
@GetMapping("/degrade")
@SentinelResource(value = DEGRADE_RESOURCE_NAME,entryType = EntryType.IN,blockHandler = "blockHandlerForFb")
public User degrade(String userId) throws InterruptedException
throw new RuntimeException("异常了");
public User blockHandlerForFb(String userId,BlockException b)
return new User("degrade_user","degrade_user");
7.1.3 接口测试
启动工程后,第一次调用接口将出现下面的效果;
接着连续快速调用将出现下面的效果,即触发了降级的规则,走了异常后降级的逻辑;
八、Sentinel 控制台
上面通过编码的方式演示了Sentinel 的各种规则的使用,事实上从生产运维来说,使用编码的方式进行控制可能要求还是比较高的,有没有更简便的方式呢?Sentinel 官方还提供了控制台,通过控制台,可以更简单方便的对其内部提供的各类规则,比如流控、降级规则等进行使用。
8.1 部署dashboard
8.1.1 获取安装包
jar包git下载地址:控制台下载地址,里面提供了各个版本的下载地址,需要结合当前工程的springboot以及springcloud-alibaba 的版本选择合适的版本下载;
window环境下,选择第一个下载即可
8.1.2 启动dashboard
下载后的包,直接使用java -jar就可以启动了
启动效果
8.1.3 访问界面
启动完成后,直接使用8080端口就可以访问了
默认登录账户名和密码为:sentinel /sentinel,登录进去之后效果如下;
为什么是一个空空如也的界面呢?这是因为sentinel的dashboard默认是懒加载机制,只有当有请求被拦截后才会进行展示。后面将会具体说明。
8.2 与springboot整合
有了控制台之后,就可以对现有的工程进行控制台的接入了
8.2.1 引入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-sentinel</artifactId>
</dependency>
8.2.2 添加yml配置
server:
port: 8031
spring:
application:
name: service-sentinel
#注册中心使用nacos
cloud:
nacos:
discovery:
server-addr: localhost:8848
#连接sentinel的dashboard
sentinel:
transport:
dashboard: localhost:8080
#暴露的健康检查服务端点
management:
endpoints:
web:
exposure:
include: '*'
8.2.3 访问接口
再次启动工程后,访问一下工程中的某个接口之后,再次刷新dashboard,此时就可以看到工程中的所有接口了,针对这些接口,可以结合官方手册,对其中的接口进行细腻度的流控、降级等策略配置;
8.2.4 dashboard 限流规则配置
接下来简单演示下dashboard的流控使用,首先在工程中增加一个接口
@GetMapping("/flow")
public String flow()
return "flow test";
访问一下接口之后,就可以在dashboard中看到这个接口地址了;
对该接口设置限流规则QPS为1,即每秒只允许一个请求访问
设置并保存之后,按照之前的方式请求接口,如果连续快速的请求,将看到下面的效果,说明dashboard上面配置的限流规则生效了;
九、写在结尾
本篇通过较大的篇幅详细介绍了Sentinel的使用,从SDK中流控规则、降级规则的使用,到通过部署dashboard的使用,Sentinel作为Spring Cloud Alibaba微服务架构下的重要组件,在系统架构的设计中可以说担任着非常关键的角色,尤其是其内部的设计思想以及限流算法,对于开发者来说,具有很好的学习和借鉴,有兴趣的同学可以深入源码,深入学习。
译:基于Spring Cloud Stream构建和测试 message-driven 微服务
原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 译者: helloworldtang img Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案。您可以基于Spring Cloud Netflix库创建同步REST微服务,正如我在之前的一篇文章中所展示的那样 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服务指南。您可以使用Spring WebFlux项目在Netty上创建异步的、响应式的微服务,并将其与一些Spring Cloud库相结合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建响应式微服务。最后,您可以使用Spring Cloud Stream和类似Apache Kafka或RabbitMQ这样的broker来实现基于发布/订阅模型的message-driven微服务。构建微服务的最后一种方法是本文的主要主题。我将向您展示如何在RabbitMQ broker的基础上有效地构建、扩展、运行和测试消息传递微服务。 体系结构 为了演示Spring Cloud Stream的特性,我们将设计一个示例系统,该系统使用发布/订阅模型进行跨服务通信。我们有三个微服务:order-service、product-service和account-service。应用程序order-service暴露了负责处理发送到我们系统的订单的HTTP endpoint。所有传入的订单都是异步处理的——order-service准备并发送消息到RabbitMQ exchange,然后就对调用的客户端进行响应,不需要等到消息被消费后再响应。应用程序的account-service和product-service正在侦听进入该RabbitMQ exchange的订单消息。微服务account-service负责检查客户账户是否有足够的资金来支付该订单需要的金额,如果有就从该账户扣款。微服务product-service检查是否有足够的库存,并在处理订单后改变可用产品的数量。account-service 和 product-service 都通过RabbitMQ exchange(这一次是使用direct exchange的一对一通信)发送带有操作状态的异步响应。微服务 order-service根据接收到的响应消息来更新订单状态,并通过REST endpoint GET /order/{id}提供给外部客户端。 如果您觉得我们的示例描述有点难以理解,这里有一个用于澄清的架构图。 stream-1 启用 Spring Cloud Stream 在项目中使用Spring Cloud Stream的推荐方法是使用依赖管理系统。Spring Cloud Stream有一个与整个Spring Cloud framework相关,并且独立发布的依赖管理。然而,如果我们已经在Elmhurst.RELEASE版本的dependencyManagement部分声明了spring-cloud-dependencies,就不需要在pom.xml中声明任何其他内容。如果您喜欢只使用Spring Cloud Stream项目,那么您应该定义以下部分。 下一步是将spring-cloud-streamartifact添加到项目依赖项中。我还建议您至少包括spring-cloud-sleuth 库,以提供作为源请求进入order-service 的发送消息用的traceId。 Spring Cloud Stream 编程模型 为了使您的应用程序能够连接到一个message broker,请在主类上使用@EnableBinding注解。 @EnableBinding注解将一个或多个接口作为参数。您可以在Spring Cloud Stream提供的三个接口之间进行选择: Sink:这是用来标记从入站通道接收消息的服务。 Source: 这是用来向出站通道发送消息的。 Processor:当你需要一个入站通道和一个出站通道时,它可以被使用,因为它继承了Source and Sink接口。因为order-service发送消息,并接收它们,它的主类已经使用了@EnableBinding(Processor.class)注解。 下面是order-service项目中启用了Spring Cloud Stream binding的主类。 @SpringBootApplication @EnableBinding(Processor.class) public class OrderApplication { ... public static void main(String[] args) { new SpringApplicationBuilder(OrderApplication.class).web(true).run(args); } ... } 增加 message broker 在Spring Cloud Stream术语中,负责与特定message broker集成的实现称为binder。默认情况下,Spring Cloud Stream为 Kafka and RabbitMQ提供了binder实现。它能够自动检测和在类路径上查找binder。任何特定于中间件的设置都可以通过Spring Boot支持的外部配置属性来覆盖,譬如应用程序参数、环境变量,或者仅仅是application.yml文件。为了包含对RabbitMQ的支持,RabbitMQ将这篇文章用作message broker,您应该向项目添加以下依赖项。 现在,我们的应用程序需要连接RabbitMQ broker的一个共享实例。这就是为什么我使用RabbitMQ在默认的5672端口上运行Docker镜像。它还可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下启动web仪表板。 我们需要通过设置属性 spring.rabbitmq.host为Docker机器IP 192.168.99.100 ,来覆盖Spring Boot application的中的默认设置。 实现消息驱动的微服务 Spring Cloud Stream是在Spring Integration项目之上构建的。Spring Integration扩展了Spring编程模型,以支持众所周知的企业集成模式(EIP)。EIP定义了许多在分布式系统中经常使用的经典组件。您可能已经听说过诸如消息通道、路由器、聚合器或endpoints之类的模式。让我们回到上面的例子。让我们从order-service开始,它负责接收订单,将它们发布在shared topic上,然后从下游服务收集异步响应。下面是@service,它使用Sourcebean来构建消息并将其发布到远程topic。 这个 @Service 是由controller调用,controller暴露提交新订单和通过 id获得订单状态的HTTP endpoints。 现在,让我们更仔细地看看消费端。来自order-service的OrderSender bean所发送的消息是由 account-service和product-service接收。为了从 topic exchange中接收消息,我们只需要在入参为Order的方法上添加 @StreamListener注解。我们还必须为监听器定义目标通道——在这种情况下,它是Processor.INPUT。譬如: @StreamListener(Processor.INPUT) public void receiveOrder(Order order) throws JsonProcessingException { LOGGER.info("Order received: {}", mapper.writeValueAsString(order)); service.process(order); } 接收订单由AccountServicebean处理。account-service会根据客户账户上是否有足够的资金来实现订单接受或拒绝订单。验收状态的响应通过OrderSenderbean调用的输出通道发回order-service 。 最后一步是配置。它是在 application.yml中提供的。我们必须正确地定义通道的destination。而order-service则将orders-outdestination分配给输出通道,而orders-indestination则是输入通道,account-service和 product-service则恰恰相反。这是合乎逻辑的,因为通过其输出destination通过 order-service发送的消息是通过其输入destination接收的服务接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置设置。 spring: application: name: order-service rabbitmq: host: 192.168.99.100 port: 5672 cloud: stream: bindings: output: destination: orders-out producer: partitionKeyExpression: payload.customerId partitionCount: 2 input: destination: orders-in rabbit: bindings: input: consumer: exchangeType: direct 这是为 account-service和product-service提供的配置。 最后,您可以运行上面示例中的微服务。现在,我们只需要运行每个微服务的单个实例。您可以通过运行JUnit测试类OrderControllerTest来轻松地生成一些测试请求,这是在我的源代码库中提供的 order-service中提供的。这种情况下很简单。在下一篇文章中,我们将学习更高级的示例,其中包含多个正在运行的消费服务实例。 扩展 为了扩展我们的Spring Cloud Stream应用程序,我们只需要启动每个微服务的附加实例。他们仍然会侦听与当前正在运行的实例相同的 topic exchange 中的传入消息。在添加了一个 account-service和 product-service的实例之后,我们可以发送一个测试订单。这个测试的结果对我们来说是不令人满意的… 为什么?每个微服务运行的所有实例都接收到了这个订单。这正是 topic exchanges 的工作方式——发送到topic的消息被所有的消费者接收,他们正在侦听这个topic。幸运的是,Spring Cloud Stream能够通过提供称为 consumer group的解决方案来解决这个问题。它负责保证一个消息只被一个实例处理,如果它们被放置在一个相互竞争的消费者关系中。在运行多项服务实例时,对consumer group机制的转换已经在下图中可视化了。 stream-2 一个 consumer group 机制的配置不是很困难。我们只需要设定group参数,并给出给定destination的组名。下面是account-service的当前binding配置。orders-indestination地是一个为直接与order-service通信而创建的队列,因此只有orders-out被分组使用spring.cloud.stream.bindings..group属性。 Consumer group机制是Apache Kafka的一个概念,它也在Spring Cloud Stream中实现,也适用于RabbitMQ broker,它本身并不支持它。因此,我认为它在RabbitMQ上的配置非常有趣。如果您在destination运行两个服务实例,而没有在destination设置组名,那么就会有两个为单个交易所创建的bindings(每个实例一个bindings),如下图所示。因为有两个应用程序在这个exchange中监听,总共有四个binding分配给那个exchange。 stream-3 如果您为选定的destination Spring Cloud Stream设置组名,则将为给定服务的所有运行实例创建单一binding。binding的名称将以组名为后缀。 B08597_11_06 因为,我们已经在项目依赖项中包含了 spring-cloud-starter-sleuth ,在实现 order-service POST endpoint的单个请求时,在交换的所有异步请求之间发送相同的 traceId 头部。由于这个原因,我们可以使用Elastic Stack (Kibana)轻松地将所有日志关联起来。 B08597_11_05 自动化测试 您可以轻松地测试您的微服务,而不需要连接到message broker。要实现它,您需要将 spring-cloud-stream-test-support包含到您的项目依赖项中。它包含 TestSupportBinderbean,它允许您与绑定通道进行交互,并检查应用程序发送和接收的任何消息。 在测试类中,我们需要声明 MessageCollectorbean,它负责接收由TestSupportBinder保留的消息。这是我的account-service测试类。使用Processorbean,我将测试订单发送到输入通道。然后,MessageCollector接收到通过输出通道发送回 order-service 的消息。测试方法的 testAccepted创建了应该被帐户服务接受的顺序,而testRejected方法则设置了过高的订单价格,从而导致拒绝订单。 总结 当您不需要来自API的同步响应时,Message-driven的微服务是一个不错的选择。在本文中,我展示了在您的微服务之间的跨服务通信中发布/订阅模型的示例用例。源代码在GitHub上是常见的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源码maven不能运行,这个项目fork原代码并修复了错误】)。对于使用Spring Cloud Stream库、Apache Kafka的更有趣的例子,您可以参考我的书中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。 关注社区公号,加入社区纯技术微信群以上是关于springcloud 微服务Spring Cloud Alibaba Sentinel使用详解的主要内容,如果未能解决你的问题,请参考以下文章