如何在 Spring Boot 中将 Resilience4j 断路器与 WebFlux 一起使用

Posted

技术标签:

【中文标题】如何在 Spring Boot 中将 Resilience4j 断路器与 WebFlux 一起使用【英文标题】:How to use Resilience4j Circuit Breaker with WebFlux in Spring Boot 【发布时间】:2021-09-11 07:38:07 【问题描述】:

我有调用下游服务 B 的服务 A。

服务A代码

@RestController
@RequestMapping(value = "", produces = MediaType.APPLICATION_JSON_VALUE)
public class GreetingController 

    private final GreetingService greetingService;

    public GreetingController(GreetingService greetingService)
        this.greetingService = greetingService;
    

    @GetMapping(value = "/greetings")
    public Mono<String> getGreetings() 
        return greetingService.callServiceB();
    


@Component
@RequiredArgsConstructor
public class GreetingService 
    
    CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("greetingService");
    Callable<Mono<String>> callable = CircuitBreaker.decorateCallable(circuitBreaker, this::clientCall);
    Future<Mono<String>> future = Executors.newSingleThreadExecutor().submit(callable);

    public Mono<String> callServiceB() 
        try 
            return future.get();
         catch (CircuitBreakerOpenException | InterruptedException | ExecutionException ex)
            return Mono.just("Service is down!");
        
    


    private final String url = "/v1/holidaysgreetings";
    
    private Mono<String> clientCall()
        WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
        
        return client
                .get()
                .uri(url)
                .retrieve()
                .bodyToMono(String.class);

当我关闭下游服务 B(在 localhost:8080 上运行)并点击 GreetingsController 类中的 /greetings 端点以查看我的断路器是否正常工作时,我得到了这个令人讨厌的错误

2021-06-28 21:27:31.431 ERROR 10285 --- [nio-8081-exec-7] o.a.c.c.C.[.[.[.[dispatcherServlet]: Servlet.service() for servlet [dispatcherServlet] in context with path [/v1/holidaysgreetings] 
threw exception [Request processing failed; nested exception is org.springframework.web.reactive.function.client.WebClientRequestException: Connection refused: localhost/127.0.0.1:8080; 
nested exception is io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8080] with root cause

java.net.ConnectException: Connection refused

有人知道我为什么会得到这个吗?我在这里缺少什么?我是否正确实施了断路器?

【问题讨论】:

你应该改用resilience4j的reactor支持:resilience4j.readme.io/docs/examples-1 【参考方案1】:

您将反应式库与常规的非反应式库混合在一起。如果您打算使用spring-webflux,最好将reactor-resilience4j 与常规reactor-adapter 库一起使用。

使用这些导入:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j'
implementation "io.projectreactor.addons:reactor-adapter:$reactorVersion"

您也没有创建您可以依赖的断路器服务。创建它后,您可以调用“ Mono run(Mono toRun, Function<Throwable, Mono> fallback)”(如果您愿意,可以调用返回 Flux 的那个)来执行您的服务并提供回退。

这是来自a demo code 的一个示例。

@RestController
public class CompletableFutureDemoController 

    Logger LOG = LoggerFactory.getLogger(CompletableFutureDemoController.class);

    private CompletableFutureHttpBinService httpBin;
    private ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory;

    public CompletableFutureDemoController(CompletableFutureHttpBinService httpBin, ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory) 
        this.httpBin = httpBin;
        this.reactiveCircuitBreakerFactory = reactiveCircuitBreakerFactory;
    

    @GetMapping("/completablefuture/delay/seconds")
    public Mono<Map> delay(@PathVariable int seconds) 
        return reactiveCircuitBreakerFactory.create("completablefuturedelay")
            .run(Mono.fromFuture(httpBin.delay(seconds)), t -> 
                LOG.warn("delay call failed error", t);
                Map<String, String> fallback = new HashMap();
                fallback.put("hello", "world");
                return Mono.just(fallback);
            
        );
    

【讨论】:

以上是关于如何在 Spring Boot 中将 Resilience4j 断路器与 WebFlux 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

如何在spring boot gradle项目中将时间戳附加到jar?

如何在 Wildfly 中将外部属性文件加载到 Spring Boot

如何在spring boot中将@Value属性从application.properties注入@InjectMocks?

如何在 Spring Boot 中将嵌套的 JSON 对象映射为 SQL 表行

如何在 Spring Boot 应用程序中将用户添加到嵌入式 tomcat?

如何在弹性beantalk中将PostgreSQL RDS连接到spring boot?