使用 Resilience4j 框架实现重试机制
Posted 信码由缰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 Resilience4j 框架实现重试机制相关的知识,希望对你有一定的参考价值。
在本文中,我们将从快速介绍 Resilience4j 开始,然后深入探讨其 Retry 模块。我们将了解何时、如何使用它,以及它提供的功能。在此过程中,我们还将学习实现重试时的一些良好实践。
代码示例
本文在 GitHu 上附有工作代码示例。
什么是 Resilience4j?
当应用程序通过网络进行通信时,会有很多出错的情况。由于连接断开、网络故障、上游服务不可用等,操作可能会超时或失败。应用程序可能会相互过载、无响应甚至崩溃。
Resilience4j 是一个 Java 库,可以帮助我们构建弹性和容错的应用程序。它提供了一个框架,可编写代码以防止和处理此类问题。
Resilience4j 为 Java 8 及更高版本编写,适用于函数接口、lambda 表达式和方法引用等结构。
Resilience4j 模块
让我们快速浏览一下这些模块及其用途:
模块 | 目的 |
---|---|
Retry | 自动重试失败的远程操作 |
RateLimiter | 限制我们在一定时间内调用远程操作的次数 |
TimeLimiter | 调用远程操作时设置时间限制 |
Circuit Breaker | 当远程操作持续失败时,快速失败或执行默认操作 |
Bulkhead | 限制并发远程操作的数量 |
Cache | 存储昂贵的远程操作的结果 |
使用范式
虽然每个模块都有其抽象,但通常的使用范式如下:
- 创建一个 Resilience4j 配置对象
- 为此类配置创建一个 Registry 对象
- 从注册表创建或获取 Resilience4j 对象
- 将远程操作编码为 lambda 表达式或函数式接口或通常的 Java 方法
- 使用提供的辅助方法之一围绕第 4 步中的代码创建装饰器或包装器
- 调用装饰器方法来调用远程操作
步骤 1-5 通常在应用程序启动时完成一次。让我们看看重试模块的这些步骤:
RetryConfig config = RetryConfig.ofDefaults(); // ----> 1
RetryRegistry registry = RetryRegistry.of(config); // ----> 2
Retry retry = registry.retry("flightSearchService", config); // ----> 3
FlightSearchService searchService = new FlightSearchService();
SearchRequest request = new SearchRequest("NYC", "LAX", "07/21/2020");
Supplier<List<Flight>> flightSearchSupplier =
() -> searchService.searchFlights(request); // ----> 4
Supplier<List<Flight>> retryingFlightSearch =
Retry.decorateSupplier(retry, flightSearchSupplier); // ----> 5
System.out.println(retryingFlightSearch.get()); // ----> 6
什么时候使用重试?
远程操作可以是通过网络发出的任何请求。通常,它是以下之一:
- 向 REST 端点发送 HTTP 请求
- 调用远程过程 (RPC) 或 Web 服务
- 从数据存储(SQL/NoSQL 数据库、对象存储等)读取和写入数据
- 向消息代理(RabbitMQ/ActiveMQ/Kafka 等)发送和接收消息
当远程操作失败时,我们有两种选择——立即向我们的客户端返回错误,或者重试操作。如果重试成功,这对客户来说是件好事——他们甚至不必知道这是一个临时问题。
选择哪个选项取决于错误类型(瞬时或永久)、操作(幂等或非幂等)、客户端(人或应用程序)和用例。
暂时性错误是暂时的,通常,如果重试,操作很可能会成功。请求被上游服务限制、连接断开或由于某些服务暂时不可用而超时就是例子。
来自 REST API 的硬件故障或 404(未找到)响应是永久性错误的示例,重试无济于事。
如果我们想应用重试,操作必须是幂等的。假设远程服务接收并处理了我们的请求,但在发送响应时出现问题。在这种情况下,当我们重试时,我们不希望服务将请求视为新请求或返回意外错误(想想银行转账)。
重试会增加 API 的响应时间。如果客户端是另一个应用程序,如 cron 作业或守护进程,这可能不是问题。但是,如果是一个人,有时最好做出响应,快速失败并提供反馈,而不是在我们不断重试时让这个人等待。
对于某些关键用例,可靠性可能比响应时间更重要,即使客户是个人,我们也可能需要实现重试。银行转账或旅行社预订航班和旅行酒店的转账就是很好的例子 - 用户期望可靠性,而不是对此类用例的即时响应。我们可以通过立即通知用户我们已接受他们的请求并在完成后通知他们来做出响应。
使用 Resilience4j 重试模块
RetryRegistry
、RetryConfig
和 Retry
是 resilience4j-retry 中的主要抽象。RetryRegistry
是用于创建和管理 Retry
对象的工厂。RetryConfig
封装了诸如应该尝试重试多少次、尝试之间等待多长时间等配置。每个 Retry
对象都与一个 RetryConfig
相关联。 Retr
y 提供了辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。
让我们看看如何使用 retry 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService
类封装的远程服务通信。
简单重试
在简单重试中,如果在远程调用期间抛出 RuntimeException
,则重试该操作。 我们可以配置尝试次数、尝试之间等待多长时间等:
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.of(2, SECONDS))
.build();
// Registry, Retry creation omitted
FlightSearchService service = new FlightSearchService();
SearchRequest request = new SearchRequest("NYC", "LAX", "07/31/2020");
Supplier<List<Flight>> flightSearchSupplier =
() -> service.searchFlights(request);
Supplier<List<Flight>> retryingFlightSearch =
Retry.decorateSupplier(retry, flightSearchSupplier);
System.out.println(retryingFlightSearch.get());
我们创建了一个 RetryConfig
,指定我们最多要重试 3 次,并在两次尝试之间等待 2 秒。如果我们改用 RetryConfig.ofDefaults()
方法,则将使用 3 次尝试和 500 毫秒等待持续时间的默认值。
我们将航班搜索调用表示为 lambda 表达式 - List<Flight>
的 Supplier
。Retry.decorateSupplier()
方法使用重试功能装饰此 Supplier
。最后,我们在装饰过的 Supplier
上调用 get()
方法来进行远程调用。
如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用 decorateSupplier()
。如果我们想创建它并立即执行它,我们可以使用 executeSupplier()
实例方法代替:
List<Flight> flights = retry.executeSupplier(
() -> service.searchFlights(request));
这是显示第一个请求失败然后第二次尝试成功的示例输出:
Searching for flights; current time = 20:51:34 975
Operation failed
Searching for flights; current time = 20:51:36 985
Flight search successful
[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]
在已检异常上重试
现在,假设我们要重试已检查和未检查的异常。假设我们正在调用FlightSearchService.searchFlightsThrowingException()
,它可以抛出一个已检查的 Exception
。由于 Supplier
不能抛出已检查的异常,我们会在这一行得到编译器错误:
Supplier<List<Flight>> flightSearchSupplier =
() -> service.searchFlightsThrowingException(request);
我们可能会尝试在 lambda 表达式中处理 Exception 并返回 Collections.emptyList()
,但这看起来不太好。更重要的是,由于我们自己捕获 Exception
,重试不再起作用:
ExceptionSupplier<List<Flight>> flightSearchSupplier = () -> {
try {
return service.searchFlightsThrowingException(request);
} catch (Exception e) {
// dont do this, this breaks the retry!
}
return Collections.emptyList();
};
那么当我们想要重试远程调用可能抛出的所有异常时,我们应该怎么做呢?我们可以使用Retry.decorateCheckedSupplier()
(或 executeCheckedSupplier()
实例方法)代替 Retry.decorateSupplier()
:
CheckedFunction0<List<Flight>> retryingFlightSearch =
Retry.decorateCheckedSupplier(retry,
() -> service.searchFlightsThrowingException(request));
try {
System.out.println(retryingFlightSearch.apply());
} catch (...) {
// handle exception that can occur after retries are exhausted
}
Retry.decorateCheckedSupplier()
返回一个 CheckedFunction0
,它表示一个没有参数的函数。请注意对 CheckedFunction0
对象的 apply()
调用以调用远程操作。
如果我们不想使用 Suppliers
,Retry
提供了更多的辅助装饰器方法,如 decorateFunction()
、decorateCheckedFunction()
、decorateRunnable()
、decorateCallable()
等,以与其他语言结构一起使用。decorate*
和 decorateChecked*
版本之间的区别在于,decorate*
版本在 RuntimeExceptions
上重试,而 decorateChecked*
版本在 Exception
上重试。
有条件重试
上面的简单重试示例展示了如何在调用远程服务时遇到 RuntimeException
或已检查 Exception
时重试。在实际应用中,我们可能不想对所有异常都重试。 例如,如果我们得到一个AuthenticationFailedException
重试相同的请求将无济于事。当我们进行 HTTP 调用时,我们可能想要检查 HTTP 响应状态代码或在响应中查找特定的应用程序错误代码来决定是否应该重试。让我们看看如何实现这种有条件的重试。
Predicate-based条件重试
假设航空公司的航班服务定期初始化其数据库中的航班数据。对于给定日期的飞行数据,此内部操作需要几秒钟时间。 如果我们在初始化过程中调用当天的航班搜索,该服务将返回一个特定的错误代码 FS-167。航班搜索文档说这是一个临时错误,可以在几秒钟后重试该操作。
让我们看看如何创建 RetryConfig
:
RetryConfig config = RetryConfig.<SearchResponse>custom()
.maxAttempts(3)
.waitDuration(Duration.of(3, SECONDS))
.retryOnResult(searchResponse -> searchResponse
.getErrorCode()
.equals("FS-167"))
.build();
我们使用 retryOnResult()
方法并传递执行此检查的 Predicate
。这个 Predicate
中的逻辑可以像我们想要的那样复杂——它可以是对一组错误代码的检查,也可以是一些自定义逻辑来决定是否应该重试搜索。
Exception-based条件重试
假设我们有一个通用异常FlightServiceBaseException
,当在与航空公司的航班服务交互期间发生任何意外时会抛出该异常。作为一般策略,我们希望在抛出此异常时重试。但是我们不想重试 SeatsUnavailableException
的一个子类 - 如果航班上没有可用座位,重试将无济于事。我们可以通过像这样创建 RetryConfig
来做到这一点:
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.of(3, SECONDS))
.retryExceptions(FlightServiceBaseException.class)
.ignoreExceptions(SeatsUnavailableException.class)
.build();
在 retryExceptions()
中,我们指定了一个异常列表。ignoreExceptions()
将重试与此列表中的异常匹配或继承的任何异常。我们把我们想忽略而不是重试的那些放入ignoreExceptions()
。如果代码在运行时抛出一些其他异常,比如 IOException
,它也不会被重试。
假设即使对于给定的异常,我们也不希望在所有情况下都重试。也许我们只想在异常具有特定错误代码或异常消息中的特定文本时重试。在这种情况下,我们可以使用 retryOnException
方法:
Predicate<Throwable> rateLimitPredicate = rle ->
(rle instanceof RateLimitExceededException) &&
"RL-101".equals(((RateLimitExceededException) rle).getErrorCode());
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.of(1, SECONDS))
.retryOnException(rateLimitPredicate)
build();
与 predicate-based (基于谓词)的条件重试一样,谓词内的检查可以根据需要复杂化。
退避策略
到目前为止,我们的示例有固定的重试等待时间。通常我们希望在每次尝试后增加等待时间——这是为了让远程服务有足够的时间在当前过载的情况下进行恢复。我们可以使用 IntervalFunction
来做到这一点。
IntervalFunction
是一个函数式接口——它是一个以尝试次数为参数并以毫秒为单位返回等待时间的 Function
。
随机间隔
这里我们指定尝试之间的随机等待时间:
RetryConfig config = RetryConfig.custom()
.maxAttempts(4)
.intervalFunction(IntervalFunction.ofRandomized(2000))
.build();
IntervalFunction.ofRandomized()
有一个关联的 randomizationFactor
。我们可以将其设置为 ofRandomized()
的第二个参数。如果未设置,则采用默认值 0.5。这个 randomizationFactor
决定了随机值的分布范围。因此,对于上面的默认值 0.5,生成的等待时间将介于 1000 毫秒(2000 - 2000 0.5)和 3000 毫秒(2000 + 2000 0.5)之间。
这种行为的示例输出如下:
Searching for flights; current time = 20:27:08 729
Operation failed
Searching for flights; current time = 20:27:10 643
Operation failed
Searching for flights; current time = 20:27:13 204
Operation failed
Searching for flights; current time = 20:27:15 236
Flight search successful
[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX},...]
指数间隔
对于指数退避,我们指定两个值 - 初始等待时间和乘数。在这种方法中,由于乘数,等待时间在尝试之间呈指数增长。例如,如果我们指定初始等待时间为 1 秒,乘数为 2,则重试将在 1 秒、2 秒、4 秒、8 秒、16 秒等之后进行。当客户端是后台作业或守护进程时,此方法是推荐的方法。
以下是我们如何为指数退避创建 RetryConfig
:
RetryConfig config = RetryConfig.custom()
.maxAttempts(6)
.intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
.build();
这种行为的示例输出如下:
Searching for flights; current
time = 20:37:02 684
Operation failed
Searching for flights; current time = 20:37:03 727
Operation failed
Searching for flights; current time = 20:37:05 731
Operation failed
Searching for flights; current time = 20:37:09 731
Operation failed
Searching for flights; current time = 20:37:17 731
IntervalFunction
还提供了一个 exponentialRandomBackoff()
方法,它结合了上述两种方法。我们还可以提供 IntervalFunction 的自定义实现。
重试异步操作
直到现在我们看到的例子都是同步调用。让我们看看如何重试异步操作。假设我们像这样异步搜索航班:
CompletableFuture.supplyAsync(() -> service.searchFlights(request))
.thenAccept(System.out::println);
searchFlight(
) 调用发生在不同的线程上,当它返回时,返回的 List<Flight>
被传递给 thenAccept()
,它只是打印它。
我们可以使用 Retry
对象上的 executeCompletionStage()
方法对上述异步操作进行重试。 此方法采用两个参数 - 一个 ScheduledExecutorService
将在其上安排重试,以及一个 Supplier<CompletionStage>
将被装饰。它装饰并执行 CompletionStage
,然后返回一个 CompletionStage
,我们可以像以前一样调用 thenAccept
:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Supplier<CompletionStage<List<Flight>>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(() -> service.searchFlights(request));
retry.executeCompletionStage(scheduler, completionStageSupplier)
.thenAccept(System.out::println);
在实际应用程序中,我们将使用共享线程池 (Executors.newScheduledThreadPool()
) 来调度重试,而不是此处显示的单线程调度执行器。
重试事件
在所有这些例子中,装饰器都是一个黑盒子——我们不知道什么时候尝试失败了,框架代码正在尝试重试。假设对于给定的请求,我们想要记录一些详细信息,例如尝试计数或下一次尝试之前的等待时间。 我们可以使用在不同执行点发布的重试事件来做到这一点。Retry 有一个 EventPublisher
,它具有 onRetry()
、onSuccess()
等方法。
我们可以通过实现这些监听器方法来收集和记录详细信息:
Retry.EventPublisher publisher = retry.getEventPublisher();
publisher.onRetry(event -> System.out.println(event.toString()));
publisher.onSuccess(event -> System.out.println(event.toString()));
类似地,RetryRegistry
也有一个 EventPublisher
,它在 Retry
对象被添加或从注册表中删除时发布事件。
重试指标
Retry
维护计数器以跟踪操作的次数
- 第一次尝试成功
- 重试后成功
- 没有重试就失败了
- 重试后仍失败
每次执行装饰器时,它都会更新这些计数器。
为什么要捕获指标?
捕获并定期分析指标可以让我们深入了解上游服务的行为。它还可以帮助识别瓶颈和其他潜在问题。
例如,如果我们发现某个操作通常在第一次尝试时失败,我们可以调查其原因。如果我们发现我们的请求在建立连接时受到限制或超时,则可能表明远程服务需要额外的资源或容量。
如何捕获指标?
Resilience4j 使用 Micrometer 发布指标。Micrometer 为监控系统(如 Prometheus、Azure Monitor、New Relic 等)提供了仪表客户端的外观。因此我们可以将指标发布到这些系统中的任何一个或在它们之间切换,而无需更改我们的代码。
首先,我们像往常一样创建 RetryConfig
和 RetryRegistry
和 Retry
。然后,我们创建一个 MeterRegistry
并将 etryRegistry
绑定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry);
运行几次可重试操作后,我们显示捕获的指标:
Consumer<Meter> meterConsumer = meter -> {
String desc = meter.getId().getDescription();
String metricName = meter.getId().getTag("kind");
Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
.filter(m -> m.getStatistic().name().equals("COUNT"))
.findFirst()
.map(m -> m.getValue())
.orElse(0.0);
System.out.println(desc + " - " + metricName + ": " + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);
一些示例输出如下:
The number of successful calls without a retry attempt - successful_without_retry: 4.0
The number of failed calls without a retry attempt - failed_without_retry: 0.0
The number of failed calls after a retry attempt - failed_with_retry: 0.0
The number of successful calls after a retry attempt - successful_with_retry: 6.0
当然,在实际应用中,我们会将数据导出到监控系统并在仪表板上查看。
重试时的注意事项和良好实践
服务通常提供具有内置重试机制的客户端库或 SDK。对于云服务尤其如此。 例如,Azure CosmosDB 和 Azure 服务总线为客户端库提供内置重试工具。 它们允许应用程序设置重试策略来控制重试行为。
在这种情况下,最好使用内置的重试而不是我们自己的编码。如果我们确实需要自己编写,我们应该禁用内置的默认重试策略 - 否则,它可能导致嵌套重试,其中应用程序的每次尝试都会导致客户端库的多次尝试。
一些云服务记录瞬时错误代码。例如,Azure SQL 提供了它期望数据库客户端重试的错误代码列表。在决定为特定操作添加重试之前,最好检查一下服务提供商是否有这样的列表。
另一个好的做法是将我们在 RetryConfig 中使用的值(例如最大尝试次数、等待时间和可重试错误代码和异常)作为我们服务之外的配置进行维护。如果我们发现新的暂时性错误或者我们需要调整尝试之间的间隔,我们可以在不构建和重新部署服务的情况下进行更改。
通常在重试时,框架代码中的某处可能会发生 Thread.sleep()。对于在重试之间有等待时间的同步重试就是这种情况。如果我们的代码在 Web 应用程序的上下文中运行,则 Thread 很可能是 Web 服务器的请求处理线程。因此,如果我们进行过多的重试,则会降低应用程序的吞吐量。
结论
在本文中,我们了解了 Resilience4j 是什么,以及如何使用它的重试模块使我们的应用程序可以在应对临时错误具备弹性。我们研究了配置重试的不同方法,以及在不同方法之间做出决定的一些示例。我们学习了一些在实施重试时要遵循的良好实践,以及收集和分析重试指标的重要性。
您可以使用 GitHub 上的代码尝试一个完整的应用程序来演示这些想法。
本文译自: Implementing Retry with Resilience4j - Reflectoring
以上是关于使用 Resilience4j 框架实现重试机制的主要内容,如果未能解决你的问题,请参考以下文章
Java 项目中使用 Resilience4j 框架实现异步超时处理
Java 项目中使用 Resilience4j 框架实现故障隔离