如何从另一种方法向反应堆热通量动态添加元素?

Posted

技术标签:

【中文标题】如何从另一种方法向反应堆热通量动态添加元素?【英文标题】:How to dynamically add elements to reactor hot flux from another method? 【发布时间】:2020-08-22 13:57:18 【问题描述】:

我有一个数据源服务,它以观察者为参数。

void subscribe(Consumer onEventConsumer);

我想使用通量作为 RSocket 的响应流。 我怎样才能做到这一点? 我现在看到的应该是这样的

Flux<T> controllerMethod(RequestMessage mgs) 
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;

但是我很怀疑这是一个合适的解决方案,而且我是响应式方法的新手,我不知道我应该在这里使用什么方法?

【问题讨论】:

【参考方案1】:

正如 Simon 已经指出的,这就是您使用 Flux.create 的目的。

看看projectreactor.io上的Getting Started Guide。

在镜头中,您在 create 方法的 lambda 中注册一个自定义侦听器:

Flux<String> bridge = Flux.create(sink -> 
    myEventProcessor.register( 
      new MyEventListener<String>()  

        public void onDataChunk(List<String> chunk) 
          for(String s : chunk) 
            sink.next(s); 
          
        

        public void processComplete() 
            sink.complete(); 
        
    );
);

您要做的是将传入的元素传递给FluxSink,然后它将在 Flux 上发布这些元素。

【讨论】:

【参考方案2】:

这是 Flux.create 的一个典型用例。您从 create lambda 内部注册一个观察者,它将接收到的数据传递给提供的 FluxSink

【讨论】:

在我们想在 Http 调用之后向通量中添加元素的情况下,您的建议是否有效?即客户端向端点发送一个 POST 请求,其中包含他想要发布的消息。谢谢 这听起来像是一个完全不同的用例。在 OP 的情况下,单个Flux 需要反映通过调用(据我所知非反应性)subscriber(Consumer) 建立的单个侦听器。在您的情况下,听起来对该端点的多次调用都应该发布到同一个Flux。请查看 3.4.0 中的 Sinks API。 这正是我所做的!它制作精良,似乎涵盖了多线程的安全生产。谢谢西蒙。

以上是关于如何从另一种方法向反应堆热通量动态添加元素?的主要内容,如果未能解决你的问题,请参考以下文章

哪一种是向网站添加地理位置元信息的正确方法?

如何在本机反应中向视图添加动态宽度和颜色?

如何在一种方法中为数组赋值并从另一种方法调用

从另一个方法将子视图添加到子视图

另一种在WINFORM中使用XNA的方法

如何在特定位置添加项目