从另一个 API 端点动态触发现有 Flux - Spring Webflux
Posted
技术标签:
【中文标题】从另一个 API 端点动态触发现有 Flux - Spring Webflux【英文标题】:Dynamically trigger existing Flux from another API end point - Spring Webflux 【发布时间】:2020-03-17 09:50:11 【问题描述】:我正在尝试使用 web-flux 构建一个微服务,它将根据特定订阅者的事件发送/发布一些数据。
通过以下实现 (Another Stackflow Issue),我可以创建一个发布者,当我们通过调用“/send”API 触发事件时,所有订阅者都会自动接收数据
@SpringBootApplication
@RestController
public class DemoApplication
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args)
SpringApplication.run(DemoApplication.class, args);
public DemoApplication()
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
@GetMapping("/send/userId")
public void test(@PathVariable("userId") String userId)
sink.next("Hello World #" + counter.getAndIncrement());
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse()
return processor.map(e -> ServerSentEvent.builder(e).build());
问题陈述 - 我的应用程序具有基于用户的访问权限,并且对于每个用户,都会有一些我只想根据事件推送的通知。在这里,事件将使用用户 ID 存储在数据库中,当我们从另一个 API 中点击“发送”端点以及“userId”作为路径变量时,它应该只发送与该用户相关的数据,前提是它已注册作为订阅者并仍在收听频道。
【问题讨论】:
您好,您的问题恐怕不清楚。你能更新它以清楚地解释你的要求/问题吗? @AkhilBojedla - 我已经用我的问题陈述更新了问题。 【参考方案1】:这不是一个准确或实际的解决方案,但这件事有效: 首先是 SSE 端点:
@RestController
public class SSEController
private String value = "";
public String getValue()
return value;
public void setValue(String value)
this.value = value;
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords()
System.out.println("sse ?");
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> getValue());
从任何服务或任何您可以自动装配的地方,只需自动装配即可:
@Service
public class SomeService
@Autowired
private SSEController sseController;
...
void something()
....
sseController.setValue("some value");
....
这是我正在使用的方式。
【讨论】:
以上是关于从另一个 API 端点动态触发现有 Flux - Spring Webflux的主要内容,如果未能解决你的问题,请参考以下文章