如何检查 Flux<Object> 是不是为空?

Posted

技术标签:

【中文标题】如何检查 Flux<Object> 是不是为空?【英文标题】:How to check Flux<Object> is empty or not?如何检查 Flux<Object> 是否为空? 【发布时间】:2021-03-15 05:23:37 【问题描述】:

我有一个 api 供 kubernetes 调用并检测服务是否可用。在该 api 中,首先调用一个接口获取其他服务的主机,该接口返回一个 Flux,如果结果为空 api 返回 SERVICE_UNAVAILABLE 其他返回好的。我当前的代码如下:

@GetMapping(value = "/gateway/readiness")
public Mono<Long> readiness(ServerHttpResponse response) 
    Flux<Map<String, List<String>>> hosts = hostProvider.getHosts();
    List<String> hostProviders = new ArrayList<>();
    
    // the below code has a warning: Calling subscribe in a non-blocking scope
    hosts.subscribe(new Consumer<Map<String, List<String>>>() 
        @Override
        public void accept(Map<String, List<String>> stringListMap) 
            hostProviders.addAll(stringListMap.keySet());
        
    );
    if (hostProviders.isEmpty()) 
        response.setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
    
    return routeLocator.getRoutes().count();

有优雅这样做吗?

【问题讨论】:

你不应该写一个map,然后改用defaultIfEmpty吗? 【参考方案1】:

请尝试以下方法:

@GetMapping(value = "/gateway/readiness")
public Mono<ServerResponse> readiness() 
    return hostProvider.getHosts()
            .map(Map::keySet)
            .flatMap(set -> Flux.fromStream(set.stream()))
            .collectList()
            .flatMap(hostProviders -> 
               // response whatever you want to the client
               ServerResponse.ok().bodyValue(routeLocator.getRoutes().count())
            )
            // if no value was propagated from the above then send 503 response code
            .switchIfEmpty(ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).build());

【讨论】:

【参考方案2】:

你应该像这样重写你的代码:

@GetMapping(value = "/gateway/readiness")
public Mono<ResponseEntity<Long>> readiness() 
    Flux<Map<String, List<String>>> hosts = Flux.empty();
    return hosts
            .flatMapIterable(Map::keySet)
            .distinct()
            .collectList()
            .map(ignored -> ResponseEntity.ok(1L))
            .defaultIfEmpty(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build());

【讨论】:

我用这个方法得到一个异常:java.lang.NullPointerException: null

以上是关于如何检查 Flux<Object> 是不是为空?的主要内容,如果未能解决你的问题,请参考以下文章

反应器:Flux<object> .subscribe() 与 .toStream()

Flux.onErrorContinue 参数类型

如何有选择地跳过 Flux 上的几个处理步骤

如何处理 Flux.fromStream 中的异常抛出

如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?

如何将 Flux<pojo> 转换为 ArrayList<String>