Flux.subscribe 在处理的最后一个元素之前完成

Posted

技术标签:

【中文标题】Flux.subscribe 在处理的最后一个元素之前完成【英文标题】:Flux.subscribe finishes before the last element in processed 【发布时间】:2019-11-14 08:57:59 【问题描述】:

Spring + Flux 的奇怪行为。我有 Python 服务器代码(使用 Flask,但这并不重要,将其视为伪代码),它是流式响应:

def generate():
    for row in range(0,10):
        time.sleep(1)
        yield json.dumps("count": row) + '\n'
return Response(generate(), mimetype='application/json')

这样,我模拟处理列表中的一些任务,并在它们准备好后立即向我发送结果,而不是等待所有事情都完成,主要是为了避免将所有内容先保存在服务器的内存中,然后再保存在客户。现在我想用 Spring WebClient 来使用它:

Flux<Count> alerts = webClient
        .post()
        .uri("/testStream")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToFlux( Count.class )
        .log();
alerts.subscribe(a -> log.debug("Received count: " + a.count));
Mono<Void> mono = Mono.when(alerts);
mono.block();
log.debug("All done in method");

这是我在日志中得到的内容:

2019-07-03 18:45:08.330 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller     : Received count: 8

2019-07-03 18:45:09.323  INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4           : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@55d09f83)

2019-07-03 18:45:09.324  INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4           : onComplete()

2019-07-03 18:45:09.325 DEBUG 16256 --- [io-28088-exec-4] c.k.c.restapi.rest.Controller     : All done in method

2019-07-03 18:45:09.331  INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4           : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@da447dd)

2019-07-03 18:45:09.332 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller     : Received count: 9
2019-07-03 18:45:09.333  INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4           : onComplete()

注意在 mono.block 返回后订阅如何处理最后一个对象。我知道 Reactor 是异步的,一旦它看不到更多对象,它就会释放 Mono 并并行调用我的代码订阅。那么调度器就看什么先运行了。

我想出了一个相当丑陋的组合,即使用 completeConsumer 订阅,并使用旧的等待/通知。然后它工作正常。但是有没有更优雅的方法来确保我的方法等到 Flux 的所有元素都被处理完?

【问题讨论】:

【参考方案1】:

好的,我研究了这个领域,意识到 Reactor 是用于异步执行的。如果我需要同步,我必须使用同步。要在订阅所有内容后执行代码,我需要使用 doOnComplete:

public class FluxResult 
  public boolean success = true;
  public Exception ex = null;
  public void error() success = false;
  public void error(Exception e) success = false; ex = e;

  public synchronized void waitForFluxCompletion() throws InterruptedException 
    wait();
  

  public synchronized void notifyAboutFluxCompletion() 
    notify();
  


.... // do something which returns Flux
myflux
          .doFirst(() -> 
             // initialization
          )
          .doOnError(e -> 
            log.error("Exception", e);
          )
          .doOnComplete(() -> 
            try 
              // finalization. If we were accumulating objects, now flush them
            
            catch (Exception e) 
              log.error("Exception", e);
              flux_res.error(e);
            
            finally 
              flux_res.notifyAboutFluxCompletion();
            
          )
          .subscribe(str -> 
            // something which must be executed for each item
          );

然后等待对象发出信号:

  flux_res.waitForFluxCompletion();
  if (!flux_res.success) 
    if (flux_res.ex != null) 

【讨论】:

以上是关于Flux.subscribe 在处理的最后一个元素之前完成的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 中以不同方式处理列表中的最后一个元素?

循环中的最后一个元素是不是值得单独处理?

特殊套管最后一个元素的最佳循环成语

JavaScript强化教程——数组的基本处理函数

JavaScript强化教程——数组的基本处理函数

34. 在排序数组中查找元素的第一个和最后一个位置