gateway&reactive(响应式流)函数编程的webflux

Posted 永不熄灭的火

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gateway&reactive(响应式流)函数编程的webflux相关的知识,希望对你有一定的参考价值。

  springcloud.gateway是springcloud2的全新项目,该项目提供了一个构建在spring生态之上的API网关,包括spring5,springboot2,projectReactor。gateway旨在提高一种简单而有效的途径来转发请求,并为他们提供横切关注点,如安全性,监控/指标和弹性。在之前springcloud提供的网关是zull,zuul基于servlet2.5,使用阻塞架构,不支持长连接。zuul和negix相似,除了编程语言不同,zuul已经发布了zuul2,支持长连接,非阻塞,但是springcloud还没有整合。

项目构建:

  引入gateway和eureka依赖,gateway的网关提供了路由配置,路由断言,过滤器等功能。

一, 路由断言:

  路由断言接口由函数式接口RouterFunction接口提供支持,通过RouterFunctions的静态route方法来实现,如下:

 1 public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
 2         return new RouterFunctions.DefaultRouterFunction(predicate, handlerFunction);
 3     }
 4 
 5 private static final class DefaultRouterFunction<T extends ServerResponse> extends RouterFunctions.AbstractRouterFunction<T> {
 6         private final RequestPredicate predicate;
 7         private final HandlerFunction<T> handlerFunction;
 8 
 9         public DefaultRouterFunction(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
10             super(null);
11             Assert.notNull(predicate, "Predicate must not be null");
12             Assert.notNull(handlerFunction, "HandlerFunction must not be null");
13             this.predicate = predicate;
14             this.handlerFunction = handlerFunction;
15         }
16 
17         public Mono<HandlerFunction<T>> route(ServerRequest request) {
18             if (this.predicate.test(request)) {
19                 if (RouterFunctions.logger.isTraceEnabled()) {
20                     String logPrefix = request.exchange().getLogPrefix();
21                     RouterFunctions.logger.trace(logPrefix + String.format("Matched %s", this.predicate));
22                 }
23 
24                 return Mono.just(this.handlerFunction);
25             } else {
26                 return Mono.empty();
27             }
28         }
29 
30         public void accept(RouterFunctions.Visitor visitor) {
31             visitor.route(this.predicate, this.handlerFunction);
32         }
33     }
    private abstract static class AbstractRouterFunction<T extends ServerResponse> implements RouterFunction<T> 

  而生成需要两个参数他们分别是:RequestPredicate  predicate和 HandlerFunction<T>  handlerFunction。

predicate由RequestPredicates.path()产生,handlerFunction是接口HandlerFunction<T>的函数实现类,该类只有一个方法:Mono<T> handle(ServerRequest var1)。

  所以添加路由断言的配置类方法如下:

1 @Bean//路由断言
2     public RouterFunction<ServerResponse> doRouterFunction(){
3         RouterFunction route = RouterFunctions.route(RequestPredicates.path("/test"),
4                 request -> ServerResponse.ok().body(BodyInserters.fromValue("this is ok")));
5         return route;
6     }

  此时路由断言的作用是当请求的路径是/test时,直接返回状态吗200,且响应体为this is ok。

为什么呢?

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
    Mono<T> handle(ServerRequest var1);
}
下图的类都在org.springframework.web.reactive包下:

 

 从上面可以看出,serverResponse持有serverHttpResponse,而serverHttpResponse持有HttpServletResponse。所以这就是webflux的reactive响应式流编程实现流程。

 二,过滤器:

  网关经常要做的是对路由请求进行过滤,对符合条件的请求进行一些操作,如增加请求头,增加请求参数,增加响应头和断路器等功能。这个功能通过函数接口RouteLocator实现,RouteLocator通过RouteLocatorBuilder来获得:

 1   @Bean//过滤器
 2     public RouteLocator customRouteLocator(RouteLocatorBuilder builder){
 3         return builder.routes().route(
 4 
 5                 r -> r.path("/baidu")
 6                         .filters(f -> f.addResponseHeader("X-AnotherHeader","baz"))  
7
                  .uri("http://www.baidu.com")
8

9
    ).build();
10 }

RouteLocatorBuilder.Builder.routes就是路由表,如下图:

 具体类调用流程如下:

 1 public RouteLocatorBuilder.Builder routes() {
 2         return new RouteLocatorBuilder.Builder(this.context);
 3     }
 4 
 5 public RouteLocatorBuilder.Builder route(Function<PredicateSpec, AsyncBuilder> fn) {
 6             AsyncBuilder routeBuilder = (AsyncBuilder)fn.apply((new RouteLocatorBuilder.RouteSpec(this)).randomId());
 7             this.add(routeBuilder);
 8             return this;
 9         }
10 
11  void add(AsyncBuilder route) {
12             this.routes.add(route);
13         }
14 
15 public RouteLocator build() {
16             return () -> {
17                 return Flux.fromIterable(this.routes).map((routeBuilder) -> {
18                     return routeBuilder.build();
19                 });
20             };
21         }

  上面第5行的意思是Builder.route(Function<PredicateSpec, AsyncBuilder> fn)函数需要一个AsyncBuilder,这个AsyncBuilder是org.springframework.cloud.gateway.route.Route.AsyncBuilder,即route的异步构造器,第6行得到了这个异步构造器,第七,八行加到了Builder.routes集合中,并返回自己,然后调用自己的build方法,Flux是Reactive提供的返回结果工具类,原来返回User的话,那现在就返回Mono<User>;原来返回List<User>的话,那现在就返回Flux<User>。第18行调用build构造route。

   我们需要一个asyncbuilder,所以需要实现Function接口,如下:

  r -> r.path("/baidu").filters(f -> f.addRequestHeader("X-AnotherHeader","baz")).uri("http://www.baidu.com")
  上述方法体会最终得到一个asyncbuilder,然后开始上述带颜色部分的内容。上面的功能就是当请求路径中有
baidu,就在返回头上加上一个属性,然后转到该http://www.baidu.com地址。

 三,网关与服务中心结合

 1 spring:
 2   application:
 3     name: gateway #hystrix服务类型
 4   cloud:
 5     gateway:
 6       locator:
 7         enabled: true #开启gateway和eureka的结合
 8       routes:
 9         - id: service_eureka-service1 #id
10           uri: lb://eureka-service1 #服务
11           predicates:
12           - Path=/service_eureka-service1/**  #匹配路径
13           filters:
14           - StripPrefix=1 #使用路径过滤器去掉第一部分前缀 即去除service_eureka-service1这一部分

下面是service1的controller:

  

启动service1和gateway两个服务,注册到eureka,然后访问:

 ok,先到这里,源码后续再补充。

参考文章:https://blog.csdn.net/get_set/article/details/79480233 这兄弟的文章真不错。

以上是关于gateway&reactive(响应式流)函数编程的webflux的主要内容,如果未能解决你的问题,请参考以下文章

JVM平台上的响应式流(Reactive Streams)规范

JDK9新特性 Reactive Stream 响应式流

Reactive stream 响应式流——Webflux响应式编程利器

Reactive 响应式流与制奶厂业务

vue3手写reactive深的劫持深的监视深的响应数据

Unity基于响应式编程(Reactive programming)入门