Project Reactor 3 中的 publishOn 与 subscribeOn

Posted

技术标签:

【中文标题】Project Reactor 3 中的 publishOn 与 subscribeOn【英文标题】:publishOn vs subscribeOn in Project Reactor 3 【发布时间】:2018-06-12 21:18:34 【问题描述】:

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

虽然,当我同时使用两者时,日志中不会打印任何内容。 但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

publishOn 是否比 subscribeOn 更推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别,什么时候用哪个?

【问题讨论】:

***.com/q/41939335/438319 对一个非常相似的问题的回答很好 woolha.com/tutorials/… 【参考方案1】:

我花了一些时间才理解它,可能是因为publishOn通常在subscribeOn之前解释,这里有一个希望更简单的外行解释。

subscribeOn 表示在指定的调度程序工作人员(其他线程)上运行初始源发射,例如 subscribe(), onSubscribe() and request(),对于任何后续操作(例如 onNext/onError/onComplete, map etc)也是如此,无论 subscribeOn() 的位置如何,这种行为会发生

如果你没有在 fluent 调用中执行任何 publishOn 操作,那么一切都会在这样的线程上运行。

但是一旦你调用 publishOn() 让我们说在中间,那么任何后续的操作员调用都将在提供的调度程序工作人员上运行到这样的publishOn()

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> 
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        )
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

结果是


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

如您所见,第一个 doOnNext() 和下面的 map() 正在名为 subscribeOn_thread 的线程上运行,直到调用任何 publishOn() 才会发生这种情况,然后任何后续调用都将在提供的调度程序上运行到该 @ 987654335@,这将再次发生在任何后续呼叫中,直到有人呼叫另一个 publishOn()

【讨论】:

如果你忘记了 map 每个周期执行一次而不是提前执行的事实,那么你在其中的代码是一个痛苦的脑筋急转弯,“Inside map”是用相同的线程名称打印的事实对 subscribeOn 如何更改发射线程以及 publishOn 如何根据链的位置而不是调用的时刻更改执行线程的一个很棒的解释。很不错【参考方案2】:

这是我得到的一个小文档:

publishOn 的应用方式与任何其他运营商相同,位于订阅者链的中间。它从下游获取信号并在上游重播它们,同时从关联的调度程序对工作人员执行回调。因此,它会影响后续操作符的执行位置(直到另一个 publishOn 被链接进来)。

subscribeOn 适用于订阅过程,当构建该反向链时。因此,无论您将 subscribeOn 放在链中的哪个位置,它总是会影响源发射的上下文。但是,这不会影响后续调用 publishOn 的行为。它们仍然会为它们之后的链部分切换执行上下文。

publishOn 强制下一个运算符(可能还有下一个运算符之后的后续运算符)在不同的线程上运行。同样,subscribeOn 强制前一个运算符(可能还有前一个运算符之前的运算符)在不同的线程上运行。

【讨论】:

准确来说“publishOn”应该是从上游到下游,而不是相反【参考方案3】:

以下内容摘自优秀博文https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

发布时间

这是您想要跳线程时需要的基本运算符。来自其源的传入信号在给定的调度程序上发布,有效地将线程切换到该调度程序的工作人员之一。

这对onNextonCompleteonError 信号有效。也就是说,信号从上游源流向下游订阅者。

所以本质上,出现在该运算符下方的每个处理步骤都将在新的 Scheduler 上执行,直到另一个运算符再次切换(例如另一个 publishOn)。

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

订阅开启

这个操作符改变了订阅方法的执行位置。并且由于订阅信号向上流动,它直接影响源 Flux 订阅并开始生成数据的位置。

因此,它似乎可以向上和向下作用于操作符的反应链部分(只要没有publishOn 被抛出):

final Flux<String> fetchUrls(List<String> urls) 
  return Flux.fromIterable(urls)
           .map(url -> blockingWebClient.get(url));


// sample code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

【讨论】:

以上是关于Project Reactor 3 中的 publishOn 与 subscribeOn的主要内容,如果未能解决你的问题,请参考以下文章

使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件

#yyds干货盘点#Project Reactor

使用Project Reactor对反应流进行递归

Project Reactor 之 publishOn 与 subscribeOn

Project Reactor:仅在未发出第一项时通量超时

带有消息代理(例如 Kafka)的事件驱动微服务与反应式编程(RxJava、Project Reactor)以及改进的协议(RSocket)