如何从另一种方法向反应堆热通量动态添加元素?
Posted
技术标签:
【中文标题】如何从另一种方法向反应堆热通量动态添加元素?【英文标题】:How to dynamically add elements to reactor hot flux from another method? 【发布时间】:2020-08-22 13:57:18 【问题描述】:我有一个数据源服务,它以观察者为参数。
void subscribe(Consumer onEventConsumer);
我想使用通量作为 RSocket 的响应流。 我怎样才能做到这一点? 我现在看到的应该是这样的
Flux<T> controllerMethod(RequestMessage mgs)
var flux = Flux.empty();
dataSource.subscribe(event -> flux.push(event));
return flux;
但是我很怀疑这是一个合适的解决方案,而且我是响应式方法的新手,我不知道我应该在这里使用什么方法?
【问题讨论】:
【参考方案1】:正如 Simon 已经指出的,这就是您使用 Flux.create
的目的。
看看projectreactor.io上的Getting Started Guide。
在镜头中,您在 create
方法的 lambda 中注册一个自定义侦听器:
Flux<String> bridge = Flux.create(sink ->
myEventProcessor.register(
new MyEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s : chunk)
sink.next(s);
public void processComplete()
sink.complete();
);
);
您要做的是将传入的元素传递给FluxSink,然后它将在 Flux 上发布这些元素。
【讨论】:
【参考方案2】:这是 Flux.create 的一个典型用例。您从 create lambda 内部注册一个观察者,它将接收到的数据传递给提供的 FluxSink
【讨论】:
在我们想在 Http 调用之后向通量中添加元素的情况下,您的建议是否有效?即客户端向端点发送一个 POST 请求,其中包含他想要发布的消息。谢谢 这听起来像是一个完全不同的用例。在 OP 的情况下,单个Flux
需要反映通过调用(据我所知非反应性)subscriber(Consumer)
建立的单个侦听器。在您的情况下,听起来对该端点的多次调用都应该发布到同一个Flux
。请查看 3.4.0 中的 Sinks
API。
这正是我所做的!它制作精良,似乎涵盖了多线程的安全生产。谢谢西蒙。以上是关于如何从另一种方法向反应堆热通量动态添加元素?的主要内容,如果未能解决你的问题,请参考以下文章