如何多次消耗无限通量

Posted

技术标签:

【中文标题】如何多次消耗无限通量【英文标题】:How to consume infinite flux multiple times 【发布时间】:2022-01-15 12:57:01 【问题描述】:

这就是我想要实现的目标:

当有人请求http://localhost/runIt 时,我想从缓存中返回每 6 秒刷新一次的数据。下面,我有一个通量(总是存储在地图中的相同),它是第一次实例化并开始发出数字 0,1,2,3,4... 到无穷大。

是否可以让这个 Spring MVC 控制器方法在第一次请求时返回 "1,2",然后在 7 秒后请求返回 "3,4" 等?

另外,如果lastRunIt 在 60 秒内没有更新,我需要终止通量。

下面这段代码是我想到的,但它目前根本不起作用。

Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
  Instant lastRunIt;

  @GetMapping("runIt")
  public Flux<String> runIt()
    lastRunIt = Instant.now();
    return itos.computeIfAbsent(1, k ->
        Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
          .doOnNext(x -> 
            //dispose if no request for 60 seconds
            if(lastRunIt.plusSeconds(60).isBefore(Instant.now()))
              //someDispisable.dispose(); //<--- HOW TO GET Disposable here?
            
            System.out.println(x);
          )
          .cache(Duration.ofSeconds(6))
    );

  

【问题讨论】:

【参考方案1】:

好的,我设法做了一些看起来有效的事情。想知道它是否可以这样使用,或者这里有资源过度使用的可能性,或者更紧凑的方式来表示这一点?

  Map<Integer, Flux<String>> itos = new ConcurrentHashMap<>();
  Instant lastRunIt;

  Subscription subskrip;

  @GetMapping("runIt")
  public Flux<String> runIt()
    lastRunIt = Instant.now();
    return itos.computeIfAbsent(1, k -> 
      return Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
          .doOnSubscribe(sb -> 
            subskrip = sb; //save subscription first time this is called
          )
          .doOnNext(x -> 
            //dispose if no request for 10 seconds
            if(lastRunIt.plusSeconds(10).isBefore(Instant.now()))
                System.out.println("DISPOSINGGG");
                subskrip.cancel(); //cancel this flux
                itos.remove(1); //remove from map
            
            System.out.println(x);
          )
          .cache(Duration.ofSeconds(9))
            .take(3) //on every REST request take only 3 items that are already in cache
          .map(x -> ">" + x);
      );
  

【讨论】:

以上是关于如何多次消耗无限通量的主要内容,如果未能解决你的问题,请参考以下文章

NodeJS 在多次请求后无限期挂起

多次在应用程序功能中购买非消耗品

Spring webflux:从请求中消耗单声道或通量

计费计划的多次付款定义

用户多次点击-处理办法

用户多次点击-处理办法