从另一个 API 端点动态触发现有 Flux - Spring Webflux

Posted

技术标签:

【中文标题】从另一个 API 端点动态触发现有 Flux - Spring Webflux【英文标题】:Dynamically trigger existing Flux from another API end point - Spring Webflux 【发布时间】:2020-03-17 09:50:11 【问题描述】:

我正在尝试使用 web-flux 构建一个微服务,它将根据特定订阅者的事件发送/发布一些数据。

通过以下实现 (Another Stackflow Issue),我可以创建一个发布者,当我们通过调用“/send”API 触发事件时,所有订阅者都会自动接收数据

@SpringBootApplication
@RestController
public class DemoApplication 

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);

    

    public DemoApplication() 
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    

    @GetMapping("/send/userId")
    public void test(@PathVariable("userId") String userId) 
        sink.next("Hello World #" + counter.getAndIncrement());
    

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() 
        return processor.map(e -> ServerSentEvent.builder(e).build());
    

问题陈述 - 我的应用程序具有基于用户的访问权限,并且对于每个用户,都会有一些我只想根据事件推送的通知。在这里,事件将使用用户 ID 存储在数据库中,当我们从另一个 API 中点击“发送”端点以及“userId”作为路径变量时,它应该只发送与该用户相关的数据,前提是它已注册作为订阅者并仍在收听频道。

【问题讨论】:

您好,您的问题恐怕不清楚。你能更新它以清楚地解释你的要求/问题吗? @AkhilBojedla - 我已经用我的问题陈述更新了问题。 【参考方案1】:

这不是一个准确或实际的解决方案,但这件事有效: 首先是 SSE 端点:

@RestController
public class SSEController 
    private String value = "";
    public String getValue() 
        return value;
    
    public void setValue(String value) 
        this.value = value;
    
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<String> getWords() 
        System.out.println("sse ?");
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> getValue());
    

从任何服务或任何您可以自动装配的地方,只需自动装配即可:

@Service
public class SomeService 
    @Autowired
    private SSEController sseController;
    ...
    void something()
        ....
        sseController.setValue("some value");
        ....
    

这是我正在使用的方式。

【讨论】:

以上是关于从另一个 API 端点动态触发现有 Flux - Spring Webflux的主要内容,如果未能解决你的问题,请参考以下文章

Spring中RESTful API上flux行为的解释

Flux+React.js - 缓存 API 请求响应

Spring反应式编程:如何创建一个动态的Publishers列表作为Flux.merge的输入。

使用“无目标端点”在现有 API 代理上创建新端点

使用自定义端点扩展现有 API

如何从另一种方法向反应堆热通量动态添加元素?