用反应堆开火即忘

Posted

技术标签:

【中文标题】用反应堆开火即忘【英文标题】:Fire and forget with reactor 【发布时间】:2019-12-25 05:56:19 【问题描述】:

我的 Spring Boot 应用程序中有如下方法。

public Flux<Data> search(SearchRequest request) 
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;


//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) 
  //do some processing here


目前,我正在使用带有doThisAsync@Async 注释服务类,但不知道如何传递List&lt;Data&gt;,因为我不想调用block。 我只有Mono&lt;List&lt;Data&gt;&gt;

我的主要问题是如何单独处理这个 Mono,search 方法应该返回 Flux&lt;Data&gt;

【问题讨论】:

【参考方案1】:

1,如果你的 fire-and-forget 已经异步返回Mono/Flux

public Flux<Data> search(SearchRequest request)

    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);


public Mono<Void> doThisAsync(List<Data> data) 
    //do some async/non-blocking processing here like calling WebClient

2,如果你的 fire-and-forget 确实阻塞了 I/O

public Flux<Data> search(SearchRequest request)

    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);


public void doThisAsync(List<Data> data) 
    //do some blocking I/O on calling thread

请注意,在上述两种情况下,您都会失去背压支持。如果doAsyncThis 由于某种原因变慢了,那么数据生产者不会关心并继续生产项目。这是火与火机制的自然结果。

【讨论】:

在方法2中,搜索方法会不会等到.doOnNext完全执行完? 不,如果它真的是异步的就不会。 对不起,我想我解释错了。 doThisAsync() 是一种运行一些长数据库查询和一些报告服务的普通方法。所有这些本质上都是重调用和阻塞。所以,我希望它单独运行,search 方法应该返回它的响应。 但是它不是像你描述的那样用@Async注释吗?如果是,那么您可以使用上述解决方案。当然,您需要对其进行测试以确保它按预期工作。 我明白了,我在答案中添加了第三个选项。请检查一下。它将阻塞代码移动到单独的线程池中。【参考方案2】:

您是否考虑过使用 publishOn 在单独的线程中运行处理,如下例所示? 这可能不是您所要求的,但允许您继续处理其他事项,而通量中的结果处理由一个或多个线程完成,在我的示例中为四个,来自专用调度程序 (theFourThreadScheduler)。

    @Test
    public void processingInSeparateThreadTest() 
        final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
        final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");

        theResultFlux.log()
            .collectList()
            .publishOn(theFourThreadScheduler)
            .subscribe(theStringList -> 
                doThisAsync(theStringList);
            );

        System.out.println("Subscribed to the result flux");

        for (int i = 0; i < 20; i++) 
            System.out.println("Waiting for completion: " + i);
            try 
                Thread.sleep(300);
             catch (final InterruptedException theException) 
            
        
    

    private void doThisAsync(final List<String> inStringList) 
        for (final String theString : inStringList) 
            System.out.println("Processing in doThisAsync: " + theString);
            try 
                Thread.sleep(500);
             catch (final InterruptedException theException) 
            
        
    

运行该示例会产生以下输出,表明 doThisAsync() 中执行的处理是在后台执行的。

Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19

参考资料: Reactor 3 Reference: Schedulers

【讨论】:

doThisAsync() 方法需要整个数据,然后分别对其进行处理。 我更新了我的示例,现在 doThisAsync() 方法可以接收列表中的所有数据。 这不是我想要的,但很接近。非常感谢。 :)

以上是关于用反应堆开火即忘的主要内容,如果未能解决你的问题,请参考以下文章

化学反应速率与化学平衡

进入centos鼠标点击没反应只能用open打开怎么解决

acer电脑鼠标键盘触摸板没反应?

为啥用HDMI线连接电视和电脑没反应?

用反应制作验证系统

用反应调用 API 的最佳实践是啥