如何在可迭代的 Spring Boot Flux 中延迟发射每个项目

Posted

技术标签:

【中文标题】如何在可迭代的 Spring Boot Flux 中延迟发射每个项目【英文标题】:How to delay emitting each item in iterable Spring Boot Flux 【发布时间】:2021-12-31 03:52:44 【问题描述】:

我的问题略有不同,但我可以用以下方式描述问题。

我想要的是一些在每个延迟周期(3 秒)发出一个项目的代码。但是当我点击/flux URL 时,页面会等待 3 秒并给出所有 4 个项目。这意味着它会在 3 秒后发出所有项目,而不是每 3 秒发出一个项目。

@RestController
@RequestMapping("/flux")
public class MyController 

    List<Item> items = Arrays.asList(
            new Item("name1","description1"),
            new Item("name2","description2"),
            new Item("name3","description3"),
            new Item("name4","description4"));
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<Item> getItems()
        return Flux.fromIterable(items)
                .delayElements(Duration.ofSeconds(3));
    

@Data
@AllArgsConstructor
class Item
    String name;
    String description;

更新

我看到这个post 来解释如何在 RxJava 中做到这一点,所以我尝试了这个。但是ZipWith 的结果更糟。现在页面等待 12 秒。这意味着只有在 Flux 完成时才会发送浏览器响应……不知道为什么。

Flux<Item> getItems()
            return Flux.fromIterable(items)
           .zipWith(Flux.interval(Duration.ofSeconds(3)),(item,time)->item);

附言使用 Spring WebFlux 依赖,所以本地启动的是 Netty 而不是 Tomcat。

【问题讨论】:

【参考方案1】:

我们可以使用delayElements(Duration delay) 告诉反应器在特定延迟后发射一个元素。它总是尝试在特定持续时间后发出元素而不阻塞。

将此 URL 放入 google chrome 浏览器或任何其他消费者客户端,例如 curl,而不是邮递员。


卷曲

curl --location --request GET 'http://localhost:8080/fluxv2' \
--header 'Content-Type: text/event-stream' \
--data-raw ''

不要尝试使用 postman 测试 API,因为 postman 目前不支持流式 API。


@RestController
@RequestMapping("/fluxv2")
public class MyController 
    Flux<String> stringFlux = Flux.fromIterable(List.of("CM", "Abdullah", "Khan"));
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<String> getItems() 
        return stringFlux.delayElements(Duration.ofSeconds(3)).log();
    

Logs shown that elements are emitting after specific delay

【讨论】:

【参考方案2】:

问题不在于代码,问题在于浏览器。 如果我们使用 Chrome,上面提到的两个代码都可以按预期工作。但不适用于 safari 浏览器

【讨论】:

以上是关于如何在可迭代的 Spring Boot Flux 中延迟发射每个项目的主要内容,如果未能解决你的问题,请参考以下文章

我如何在可运行的 Spring Boot 中使用 @autowire

Flux 响应而不是 WebSocket (Spring boot)

如何将Spring Boot RepositoryRestResource映射到特定的URL

如何在 Spring Boot 中以内存高效的方式迭代 MySQL 中的大量记录

在 Spring Boot 客户端中接收通量

如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream