从块中读取 Flux<Integer>

Posted

技术标签:

【中文标题】从块中读取 Flux<Integer>【英文标题】:Reading from a Flux<Integer> in chunks 【发布时间】:2021-02-06 11:07:24 【问题描述】:

是否可以从块中读取 webflux 通量? (除了使用 delayElements 之外)

比如我写完之后

Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();

有没有办法继续读取后面的 5 个整数?

如果没有,消费者是否有替代方案来决定何时请求下一条数据?

编辑:

为了澄清,我想读取前 5 个值,然后暂停,然后在不重新创建发射器通量的情况下读取接下来的 5 个值。

【问题讨论】:

你的意思是像Flux.buffer()这样的吗? 请edit您的问题包括某种图表,您希望如何从示例中读取值 1,2,3,4,5,6,7,8,9,10 range() 你已经构建了你的应用程序。 此示例代码没有帮助。您正在谈论读取前 5 个值,然后暂停,然后读取接下来的 5 个值,但是您的示例构建了一个总共具有 3 个值的 observable,并且您使用两个单独的订阅来读取它。这没有加起来,它对你想要完成的事情毫无意义。请edit您的问题包括详细描述和/或您希望如何读取数据的图表。 您查看过buffer(int)window(int) 吗?这样,在将事件收集到该组之前,您不会发出值。因此,您可以在此类组发布后申请延迟。 【参考方案1】:

那么您需要一个成熟的异步订阅者对象,而不仅仅是一个方法链。

// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxSubscriberTest 

    @Test
    public  void test10() 
        FluxSubscriber subscriber = new FluxSubscriber();
        Flux.range(1, 10).subscribe(subscriber.inp);
        subscriber.start();
        boolean ok = subscriber.blockingAwait(5000);
        Assert.assertTrue(ok);
    

    static class FluxSubscriber extends Actor 
        InpFlow<Integer> inp = new InpFlow<>(this, 5);
        int count = 0;

        @Override
        protected void runAction() throws Throwable 
            if (inp.isCompleted()) 
                System.out.println("input stream completed");
                complete();
                return;
            
            Integer value = inp.remove();
            System.out.println("value="+value);
            if (++count==5) 
                count = 0;
                System.out.println("pause:");
                delay(1000);
            
        
    

实际上,它先读取 5 个项目,然后在每次调用 inp.remove() 后一个接一个地读取。如果这不是您想要的,那么您可以扩展类 InpFlow 以在调用 Subscription.request() 时修改策略。

源代码可在https://github.com/akaigoro/df4j 获得(是的,我是作者)。

【讨论】:

以上是关于从块中读取 Flux<Integer>的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream

从块 0 中获取密钥列表

无法将对象从块添加到 NSMutableArray

如何在文件行出现并将它们表示为 Flux 时读取它们?

Django无法读取块中的脚本