如何使用返回 Mono 的生成包装调用创建 Flux

Posted

技术标签:

【中文标题】如何使用返回 Mono 的生成包装调用创建 Flux【英文标题】:How do I create a Flux using generate wrapping calls that return a Mono 【发布时间】:2022-01-09 05:24:18 【问题描述】:

我有一个我想使用 Flux.generate 的示例,因为我不想进行昂贵的阻塞调用,除非/直到订阅者要求它。具体来说,我多次调用 Elasticsearch(有效地进行分页),直到没有更多的命中。我已经使用Iterator<SearchResponse> 中的标准阻塞调用实现了这一点。每次调用 generate lambda 块,然后使用 .subscribeOn(Schedulers.boundedElastic()) 完成通量。但是,我想使用 Spring 的 ReactiveElasticsearchClient,它返回一个 Mono<SearchResponse>,但仍然想一次做一个。

这是之前使用阻塞的代码:


  public Iterator<SearchResponse> createDeepQueryIterator(@NonNull PITSearchInput input)
    return new PointInTimeIterator(elasticClient, input);
  

  public Flux<SearchResponse> createDeepQueryFlux(@NonNull PITSearchInput input)
    return Flux.<SearchResponse, PointInTimeIterator>generate(
            () -> new PointInTimeIterator(elasticClient, input),
            (deepQueryIterator, sink) -> 
              if (deepQueryIterator.hasNext()) 
                sink.next(deepQueryIterator.next());
              else
                sink.complete();
              
              return deepQueryIterator;
            ,
            (deepQueryIterator) -> deepQueryIterator.shutdown())
        .subscribeOn(Schedulers.boundedElastic());
  

上述方法运行良好,因为它等待对 ES 进行下一次调用,直到(该)订阅者准备好接收下一个数据块。

在下面我尝试使用 Spring 的 ReactiveElasticsearchClient,但问题是在订阅者处理第一个之前对 ES 进行了多次调用。


  public Flux<SearchResponse> createDeepQuery(PointInTimeIteratorFactory.PITSearchInput input) 
    log.info("Creating flux");

    AtomicReference<PitId> pitId = new AtomicReference<>();
    AtomicInteger count = new AtomicInteger();

    Mono<PitId> pitIdMono =
        Mono.fromCallable(
            () -> 
              pitId.set(createPIT(input));
              return pitId.get();
            )
        .subscribeOn(Schedulers.boundedElastic());
    Mono<SearchResponse> searchResponseMono =
        pitIdMono.flatMap(
            p -> 
              log.info("Calling search");
              return reactiveElasticsearchClient.searchForResponse(createSearchRequestFrom(p, input));
            );
    Flux<SearchResponse> expand =
        searchResponseMono
            .expand(
                (searchResponse -> 
                  int hitCount = searchResponse.getHits().getHits().length;
                  count.addAndGet(hitCount);
                  log.info("Previous returned  hits totaling ", hitCount, count.get());

                  if (count.get() > input.getMaxTotalSize()
                  || hitCount < input.getMaxSizePerQuery())
                    log.info("Returning empty");
                    return Mono.empty();
                  

                  log.info("Calling search");
                  pitId.set(new PitId(searchResponse.pointInTimeId()));
                  return reactiveElasticsearchClient.searchForResponse(
                      createSearchRequestFrom(searchResponse, input));
                ))
            .doFinally(
                p -> 
                  deletePIT(pitId.get());
                );
    return expand;
  

所以问题不在于使用响应式客户端在Flux 中返回Mono&lt;SearchResponse&gt; 的能力,而只能根据订阅者的需要一次一次地这样做。

以下是上面 Flux -> Mono 方法的日志记录,PitTest 日志记录来自通量的测试 onNext()。

2021-12-02 13:13:37.300  INFO 13704 --- [           main] a.a.t.ReactivePointInTimeIteratorFactory : Creating flux
2021-12-02 13:13:37.346  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Creating PIT
2021-12-02 13:13:37.407  INFO 13704 --- [oundedElastic-1] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.176  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 50
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.177  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877306267
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 100
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.228  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606162
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 150
2021-12-02 13:13:38.271  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.272  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877606362
2021-12-02 13:13:38.311  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 200
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Calling search
2021-12-02 13:13:38.312  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Setting searchAfter to 1634877906244
2021-12-02 13:13:38.344  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Previous returned 50 hits totaling 250
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Returning empty
2021-12-02 13:13:38.345  INFO 13704 --- [or-http-epoll-2] a.a.t.ReactivePointInTimeIteratorFactory : Closing PIT ReactivePointInTimeIteratorFactory.PitId(id=m_2xAwENYWN0aXZpdHlzdG9yZRZQQkRGWldmclI2cWZITEpoWDI1cGlRABZCZU8xbm55ZlFabXREYmNEdThESG1RAAAAAAAAWQcTFm5BcXdPU2xTUWE2bEU4dkVPVkpkWFEBFlBCREZaV2ZyUjZxZkhMSmhYMjVwaVEAAA==)
2021-12-02 13:13:40.171  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877306066]
2021-12-02 13:13:42.172  INFO 13704 --- [     parallel-2] p.actss.activity.store.PitTest           : [1634877306272]
2021-12-02 13:13:44.172  INFO 13704 --- [     parallel-3] p.actss.activity.store.PitTest           : [1634877606166]
2021-12-02 13:13:46.173  INFO 13704 --- [     parallel-4] p.actss.activity.store.PitTest           : [1634877906057]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : [1634877906248]
2021-12-02 13:13:48.174  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : Complete
2021-12-02 13:13:48.174  INFO 13704 --- [           main] p.actss.activity.store.PitTest           : blah
2021-12-02 13:13:48.175  INFO 13704 --- [     parallel-1] p.actss.activity.store.PitTest           : onComplete

更新:添加 PitTest 代码以确保完整性:


  @Test
  void testReactoiveFluxIt() throws InterruptedException 
    Flux<SearchResponse> deepQuery = reactivePointInTimeIteratorFactory.createDeepQuery(...);

    deepQuery
        .delayElements(Duration.ofMillis(2000))
        .doOnNext(p -> log.info(Arrays.toString(p.getHits().getHits()[0].getSortValues()))) //
        .doOnComplete(() -> log.info("Complete")) //
        .doFinally(p -> log.info(p.toString()))
        .blockLast();
    log.info("blah");
    Thread.sleep(5000);
  

【问题讨论】:

订阅者应该能够控制流量并一一请求项目。显示您的订阅者所做的事情的示例代码会有所帮助。 在这种情况下的问题是在第一次给出订阅者代码之前对 elsticsearch 进行了多次调用。目前,所有订阅者代码所做的只是在 onNext() 中打印最后一次命中的“排序”。查看 PitTest 记录的行。但请注意,在第一次 PitTest 记录之前多次执行“调用搜索”。 @lkatiforis 为什么PitTest 在并行调度器上运行? @lkatiforis 原始 Mono 以 fromCallable 开头,并具有 onSubscribe(boundedElastic)。 Mono.fromCallable( () -&gt; pitId.set(createPIT(input)); return pitId.get(); ) .subscribeOn(Schedulers.boundedElastic()); 没有subscribeOn 的事件我仍然看到在 PitTest 获得第一个之前发生的所有搜索 【参考方案1】:

delayElements 切换到并行调度程序并将每个发出的元素延迟 2 秒。这就是为什么要在之后打印排序值的原因。

【讨论】:

以上是关于如何使用返回 Mono 的生成包装调用创建 Flux的主要内容,如果未能解决你的问题,请参考以下文章

如何使用代码生成来动态创建 C# 方法?

如何使用 Mono Embedding 调用接口方法?

如何结合 Mono 和 Flux 作为参数来创建新的 Mono?

如何从返回 Mono 的 Stream 创建 Flux? [复制]

如何使用 Micrometer Timer 记录异步方法的持续时间(返回 Mono 或 Flux)

从通量转换为单声道