Spring WebFlux(Flux):如何动态发布

Posted

技术标签:

【中文标题】Spring WebFlux(Flux):如何动态发布【英文标题】:Spring WebFlux (Flux): how to publish dynamically 【发布时间】:2018-12-24 12:16:09 【问题描述】:

我是响应式编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,我的 App 2 持续监听它。

我希望 Flux 按需发布(例如,当发生某些事情时)。我发现的所有示例都是使用 Flux.interval 定期发布事件,一旦创建,似乎无法在 Flux 中附加/修改内容。

我怎样才能实现我的目标?或者我在概念上完全错误。

【问题讨论】:

【参考方案1】:

使用FluxProcessorFluxSink“动态”发布

手动向Flux 提供数据的技术之一是使用FluxProcessor#sink 方法,如下例所示

@SpringBootApplication
@RestController
public class DemoApplication 

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);

    

    public DemoApplication() 
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    

    @GetMapping("/send")
    public void test() 
        sink.next("Hello World #" + counter.getAndIncrement());
    

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() 
        return processor.map(e -> ServerSentEvent.builder(e).build());
    

在这里,我创建了DirectProcessor 以支持多个订阅者,它将监听数据流。此外,我还提供了额外的 FluxProcessor#serialize,它为多生产者提供了安全支持(从不同线程调用而不违反 Reactive Streams 规范规则,尤其是 rule 1.3)。最后,通过调用“http://localhost:8080/send”我们会看到消息Hello World #1(当然,前提是你之前连接到了“http://localhost:8080”)

Reactor 3.4 更新

在 Reactor 3.4 中,您有一个名为 reactor.core.publisher.Sinks 的新 API。 Sinks API 为手动数据发送提供了一个流利的构建器,它允许您指定诸如流中的元素数量和背压行为、支持的订阅者数量和重放功能:

@SpringBootApplication
@RestController
public class DemoApplication 

    final Sinks.Many sink;
    final AtomicLong counter;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);

    

    public DemoApplication() 
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
        this.counter = new AtomicLong();
    

    @GetMapping("/send")
    public void test() 
        EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

        if (result.isFailure()) 
          // do something here, since emission failed
        
    

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() 
        return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
    

注意,通过Sinks API 发送消息引入了emission 的新概念及其结果。这种 API 的原因是 Reactor 扩展了 Reactive-Streams 并且必须遵循背压控制。也就是说,如果您emit 的信号比请求的多,并且底层实现不支持缓冲,则您的消息将不会被传递。因此,tryEmitNext 的结果返回了EmitResult,表示消息是否已发送。

另外,请注意,默认情况下Sinsk API 提供了Sink 的序列化版本,这意味着您不必关心并发性。但是,如果您事先知道消息的发送是串行的,您可以构建一个不序列化给定消息的Sinks.unsafe() 版本

【讨论】:

我对此进行了测试,效果很好!要订阅该活动,请访问“localhost:8080”。谢谢! 如何为 Mono 做到这一点? 非常有趣。你会碰巧记得你是从哪里了解到这些事情的吗?从未见过这种实现方式。 您还想像这样参数化您的FluxProcessor&lt;IN, OUT&gt;this.processor = DirectProcessor.&lt;T&gt;create().serialize() 其中 T 是您的事件类型 认为最好在 map() 之前使用 processor.subscribeOn(Schedulers.parallel()) 来拆分“send”和“sse”的线程【参考方案2】:

只是另一个想法,使用 EmitterProcessor 作为通量的网关

    import reactor.core.publisher.EmitterProcessor;
    import reactor.core.publisher.Flux;

    public class MyEmitterProcessor 
        EmitterProcessor<String> emitterProcessor;

        public static void main(String args[]) 

            MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
            Flux<String> publisher = myEmitterProcessor.getPublisher();
            myEmitterProcessor.onNext("A");
            myEmitterProcessor.onNext("B");
            myEmitterProcessor.onNext("C");
            myEmitterProcessor.complete();

            publisher.subscribe(x -> System.out.println(x));

        

        public Flux<String> getPublisher() 
            emitterProcessor = EmitterProcessor.create();
            return emitterProcessor.map(x -> "consume: " + x);
         

        public  void onNext(String nextString) 
            emitterProcessor.onNext(nextString);
        

        public  void complete() 
            emitterProcessor.onComplete();
        
    

更多信息,see here from Reactor doc。文档本身有一条建议,“大多数时候,您应该尽量避免使用处理器。它们更难正确使用,并且容易出现一些极端情况。”但我不知道是哪种极端情况。

【讨论】:

与此同时,EmitterProcessor 类已被标记为已弃用,并将在 3.5 版中删除。作为替代解决方案,建议使用Sinks.many().multicast().onBackpressureBuffer()(如该问题的第一个答案中所建议的那样)。

以上是关于Spring WebFlux(Flux):如何动态发布的主要内容,如果未能解决你的问题,请参考以下文章

Log Spring webflux 类型 - Mono 和 Flux

Spring 5 WebFlux Mono 和 Flux

如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux

Spring WebFlux,单元测试 Mono 和 Flux

spring webflux Flux<DataBuffer> 转换为 InputStream