在Spring 5中调试Reactive Streams

Posted Spring中文网

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Spring 5中调试Reactive Streams相关的知识,希望对你有一定的参考价值。

案例概述

调试Reactive Streams可能是我们开始使用这些数据结构后必须面对的主要挑战之一。

考虑到Reactive Streams在过去几年中越来越受欢迎,了解我们如何有效地执行此任务是个好主意。

让我们首先使用Reactive Streams设置项目,看看为什么这通常很麻烦。

带有错误的场景

我们想要模拟一个实际情况,其中运行了几个异步进程,并且我们在代码中引入了一些最终会触发异常的缺陷。

为了理解全局,我们将提到我们的应用程序将使用和处理简单Foo对象流,这些对象只包含id、formattedName和quantity字段。

分析日志输出

现在,让我们检查一个片段以及当出现未处理的错误时它生成的输出:

 
   
   
 
  1. public void processFoo(Flux<Foo> flux) {

  2.     flux = FooNameHelper.concatFooName(flux);

  3.     flux = FooNameHelper.substringFooName(flux);

  4.     flux = FooReporter.reportResult(flux);

  5.     flux.subscribe();

  6. }

  7.  

  8. public void processFooInAnotherScenario(Flux<Foo> flux) {

  9.     flux = FooNameHelper.substringFooName(flux);

  10.     flux = FooQuantityHelper.divideFooQuantity(flux);

  11.     flux.subscribe();

  12. }

运行我们的应用程序几秒钟后,我们会看到它会不时记录异常。

仔细查看其中一个错误,我们会发现类似于此的内容:

 
   
   
 
  1. Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15

  2.     at j.l.String.substring(String.java:1963)

  3.     at com.baeldung.debugging.consumer.service.FooNameHelper

  4.       .lambda$1(FooNameHelper.java:38)

  5.     at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)

  6.     at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)

  7.     at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)

  8.     at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)

  9.     at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)

  10.     at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)

  11.     at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)

  12.     at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)

  13.     at r.c.s.SchedulerTask.call(SchedulerTask.java:50)

  14.     at r.c.s.SchedulerTask.call(SchedulerTask.java:27)

  15.     at j.u.c.FutureTask.run(FutureTask.java:266)

  16.     at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask

  17.       .access$201(ScheduledThreadPoolExecutor.java:180)

  18.     at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask

  19.       .run(ScheduledThreadPoolExecutor.java:293)

  20.     at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  21.     at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  22.     at j.l.Thread.run(Thread.java:748)

基于根本原因,并注意到堆栈跟踪中提到的FooNameHelper类,我们可以想象在某些情况下,我们的Foo对象正在使用比预期更短的formattedName 值进行处理。

当然,这只是一个简化的案例,解决方案似乎相当明显。

但是让我们假设这是一个真实案例场景,如果没有一些上下文信息,异常本身并不能帮助我们解决问题。

异常是作为processFoo或processFooInAnotherScenario方法的一部分触发的吗?

在到达此阶段之前,其他前面的步骤是否影响了formattedName字段?

日志条目无法帮助我们找出这些问题。

更糟糕的是,有时甚至不会从我们的功能中抛出异常。

例如,假设我们依赖反应式存储库来保存我们的Foo对象。如果此时错误上升,我们甚至可能不知道从哪里开始调试代码。

我们需要工具来有效地调试反应流。

使用调试会话

确定我们的应用程序正在发生什么的一个选项是使用我们喜欢的IDE启动调试会话。

我们必须设置几个条件断点,并在流中的每个步骤执行时分析数据流。

实际上,当我们有大量的被动进程在运行和共享资源时,这可能是一项繁琐的任务。

此外,在许多情况下,出于安全原因,我们无法启动调试会话。

使用doOnError方法或使用订阅参数记录信息

有时,我们可以通过提供Consumer作为subscribe方法的第二个参数来添加有用的上下文信息:

 
   
   
 
  1. public void processFoo(Flux<Foo> flux) {

  2.  

  3.     // ...

  4.  

  5.     flux.subscribe(foo -> {

  6.         logger.debug("Finished processing Foo with Id {}", foo.getId());

  7.     }, error -> {

  8.         logger.error(

  9.           "The following error happened on processFoo method!",

  10.            error);

  11.     });

  12. }

注意:值得一提的是,如果我们不需要对subscribe方法进行进一步处理,我们可以在发布者上链接doOnError函数:

 
   
   
 
  1. flux.doOnError(error -> {

  2.     logger.error("The following error happened on processFoo method!", error);

  3. }).subscribe();

现在我们将对错误的来源提供一些指导,即使我们仍然没有太多关于生成异常的实际元素的信息。

激活Reactor的全局调试配置

Reactor库提供了一个hook类,它允许我们配置Flux和Mono操作符的行为。

通过添加以下语句,我们的应用程序将检测对发布者方法的调用,包装运算符的构造,并捕获堆栈跟踪:

 
   
   
 
  1. Hooks.onOperatorDebug();

这样就可以默认启用Thymeleaf - 无需额外配置。 调试模式激活后,我们的异常日志将包含一些有用的信息:

 
   
   
 
  1. 16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService

  2.   - The following error happened on processFoo method!

  3. java.lang.StringIndexOutOfBoundsException: String index out of range: 15

  4.     at j.l.String.substring(String.java:1963)

  5.     at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)

  6.     ...

  7.     at j.l.Thread.run(Thread.java:748)

  8.     Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException:

  9. Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :

  10.     reactor.core.publisher.Flux.map(Flux.java:5653)

  11.     c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)

  12.     c.d.b.c.s.FooService.processFoo(FooService.java:24)

  13.     c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)

  14.     o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)

  15.     o.s.s.s.DelegatingErrorHandlingRunnable

  16.       .run(DelegatingErrorHandlingRunnable.java:54)

  17.     o.u.c.Executors$RunnableAdapter.call(Executors.java:511)

  18.     o.u.c.FutureTask.runAndReset(FutureTask.java:308)

  19. Error has been observed by the following operator(s):

  20.     |_    Flux.map c.d.b.c.s.FooNameHelper

  21.             .substringFooName(FooNameHelper.java:32)

  22.     |_    Flux.map c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

我们可以看到,第一部分保持相对相同,但以下部分提供了有关以下内容的信息:

  • 发布者的程序集跟踪 - 这里我们可以确认错误最初是在processFoo方法中生成的。

  • 在第一次触发错误之后观察到错误的运算符,以及链接它们的用户类。

注意:在这个例子中,主要是为了清楚地看到这一点,我们在不同的类上添加操作。

我们可以随时打开或关闭调试模式,但它不会影响已经实例化的Flux和Mono对象。

在不同的线程上执行运算符

要记住的另一个方面是即使在流上运行不同的线程,也会正确生成程序集跟踪。

我们来看看下面的例子:

 
   
   
 
  1. public void processFoo(Flux<Foo> flux) {

  2.     flux = flux.publishOn(Schedulers.newSingle("foo-thread"));

  3.     // ...

  4.  

  5.     flux = flux.publishOn(Schedulers.newSingle("bar-thread"));

  6.     flux = FooReporter.reportResult(flux);

  7.     flux.subscribeOn(Schedulers.newSingle("starter-thread"))

  8.       .subscribe();

  9. }

现在,如果我们检查日志,我们会理解在这种情况下,第一部分可能会稍微改变,但最后两部分保持相同。

第一部分是线程堆栈跟踪,因此它只显示特定线程执行的操作。

正如我们所看到的,当我们调试应用程序时,这不是最重要的部分,因此这种更改是可以接受的。

在单个进程上激活调试输出

在每个单一的反应过程中检测和生成堆栈跟踪都是昂贵的。

因此,我们应该只在关键情况下实施前一种方法。

无论如何,Reactor提供了一种在单个关键进程上启用调试模式的方法,这样可以减少内存消耗。

我们指的是检查点操作员:

 
   
   
 
  1. public void processFoo(Flux<Foo> flux) {

  2.      

  3.     // ...

  4.  

  5.     flux = flux.checkpoint("Observed error on processFoo", true);

  6.     flux.subscribe();

  7. }

请注意,以这种方式,将在检查点阶段记录程序集跟踪:

 
   
   
 
  1. Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15

  2.     ...

  3. Assembly trace from producer [reactor.core.publisher.FluxMap],

  4.   described as [Observed error on processFoo] :

  5.     r.c.p.Flux.checkpoint(Flux.java:3096)

  6.     c.b.d.c.s.FooService.processFoo(FooService.java:26)

  7.     c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)

  8.     o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)

  9.     o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)

  10.     j.u.c.Executors$RunnableAdapter.call(Executors.java:511)

  11.     j.u.c.FutureTask.runAndReset(FutureTask.java:308)

  12. Error has been observed by the following operator(s):

  13.     |_    Flux.checkpoint c.b.d.c.s.FooService.processFoo(FooService.java:26)

我们应该在反应链的末尾实施检查点方法。

否则,操作员将无法观察下游发生的错误。

另外,请注意,库提供了重载方法。我们可以避免:

  • 如果我们使用no-args选项,则指定观察到的错误的描述

  • 通过仅提供自定义描述来生成填充堆栈跟踪(这是最昂贵的操作)

记录元素序列

最后,Reactor发布商提供了一种在某些情况下可能会派上用场的方法。

通过在我们的反应链中调用log方法,应用程序将使用它在该阶段具有的状态记录流中的每个元素。

让我们在我们的例子中尝试一下:

 
   
   
 
  1. public void processFoo(Flux<Foo> flux) {

  2.     flux = FooNameHelper.concatFooName(flux);

  3.     flux = FooNameHelper.substringFooName(flux);

  4.     flux = flux.log();

  5.     flux = FooReporter.reportResult(flux);

  6.     flux = flux.doOnError(error -> {

  7.         logger.error("The following error happened on processFoo method!", error);

  8.     });

  9.     flux.subscribe();

  10. }

并检查日志:

 
   
   
 
  1. INFO  reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)

  2. INFO  reactor.Flux.Map.1 - request(unbounded)

  3. INFO  reactor.Flux.Map.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))

  4. INFO  reactor.Flux.Map.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))

  5. INFO  reactor.Flux.Map.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))

  6. INFO  reactor.Flux.Map.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))

  7. INFO  reactor.Flux.Map.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))

  8. INFO  reactor.Flux.Map.1 - cancel()

  9. ERROR c.b.d.consumer.service.FooService

  10.   - The following error happened on processFoo method!

  11. ...

我们可以在此阶段轻松查看每个Foo对象的状态,以及在异常发生时框架如何取消流。

当然,这种方法也很昂贵,我们必须适度使用它。

案例结论

如果我们不知道正确调试应用程序的工具和机制,我们可能会花费大量时间和精力来解决问题。

如果我们不习惯处理被动和异步数据结构,那么尤其如此,我们需要额外的帮助来弄清楚事情是如何工作的。


以上是关于在Spring 5中调试Reactive Streams的主要内容,如果未能解决你的问题,请参考以下文章

Reactive-Spring-Security-5.1.3.RELEASE,多重授权

Spring 5 Reactive - WebExceptionHandler 没有被调用

如何在 Postman 中查看 Spring 5 Reactive API 的响应?

Spring 5 Reactive 中的 HTTP 响应异常处理

Spring 5:使用 Spring Webflux 开发 Reactive 应用

更新到 Spring 5.1 - java.lang.NoClassDefFoundError: org/springframework/http/server/reactive/ServletSer