从块中读取 Flux<Integer>
Posted
技术标签:
【中文标题】从块中读取 Flux<Integer>【英文标题】:Reading from a Flux<Integer> in chunks 【发布时间】:2021-02-06 11:07:24 【问题描述】:是否可以从块中读取 webflux 通量? (除了使用 delayElements 之外)
比如我写完之后
Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();
有没有办法继续读取后面的 5 个整数?
如果没有,消费者是否有替代方案来决定何时请求下一条数据?
编辑:
为了澄清,我想读取前 5 个值,然后暂停,然后在不重新创建发射器通量的情况下读取接下来的 5 个值。
【问题讨论】:
你的意思是像Flux.buffer()
这样的吗?
请edit您的问题包括某种图表,您希望如何从示例中读取值 1,2,3,4,5,6,7,8,9,10 range()
你已经构建了你的应用程序。
此示例代码没有帮助。您正在谈论读取前 5 个值,然后暂停,然后读取接下来的 5 个值,但是您的示例构建了一个总共具有 3 个值的 observable,并且您使用两个单独的订阅来读取它。这没有加起来,它对你想要完成的事情毫无意义。请edit您的问题包括详细描述和/或您希望如何读取数据的图表。
您查看过buffer(int)
或window(int)
吗?这样,在将事件收集到该组之前,您不会发出值。因此,您可以在此类组发布后申请延迟。
【参考方案1】:
那么您需要一个成熟的异步订阅者对象,而不仅仅是一个方法链。
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest
@Test
public void test10()
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
static class FluxSubscriber extends Actor
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable
if (inp.isCompleted())
System.out.println("input stream completed");
complete();
return;
Integer value = inp.remove();
System.out.println("value="+value);
if (++count==5)
count = 0;
System.out.println("pause:");
delay(1000);
实际上,它先读取 5 个项目,然后在每次调用 inp.remove()
后一个接一个地读取。如果这不是您想要的,那么您可以扩展类 InpFlow
以在调用 Subscription.request()
时修改策略。
源代码可在https://github.com/akaigoro/df4j 获得(是的,我是作者)。
【讨论】:
以上是关于从块中读取 Flux<Integer>的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?