rxjs 5 发布重播引用计数

Posted

技术标签:

【中文标题】rxjs 5 发布重播引用计数【英文标题】:rxjs 5 publishReplay refCount 【发布时间】:2017-06-30 14:19:53 【问题描述】:

我不知道publishReplay().refCount() 是如何工作的。

例如(https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer =>  
  console.log("call"); 
  // expensive http request
  observer.next(5);
).publishReplay().refCount();

subscription1 = source.subscribe(next: (v) => console.log('observerA: ' + v));
subscription1.unsubscribe();
console.log(""); 

subscription2 = source.subscribe(next: (v) => console.log('observerB: ' + v));
subscription2.unsubscribe();
console.log(""); 

subscription3 = source.subscribe(next: (v) => console.log('observerC: ' + v));
subscription3.unsubscribe();
console.log(""); 

subscription4 = source.subscribe(next: (v) => console.log('observerD: ' + v));
subscription4.unsubscribe();

给出以下结果:

调用观察者A:5

observerB: 5 调用observerB: 5

observerC:5observerC:5 调用observerC:5

observerD:5observerD:5observerD:5 调用observerD:5

1) 为什么多次调用observerB、C、D?

2) 为什么每行都打印“call”而不是行首?

另外,如果我调用publishReplay(1).refCount(),它会分别调用observerB、C 和D 2 次。

我希望每个新的观察者只收到一次值 5,并且“调用”只打印一次。

【问题讨论】:

【参考方案1】:

publishReplay(x).refCount() 组合执行以下操作:

它会创建一个ReplaySubject,最多可重放x 个发射。如果 x 没有定义,那么它会重播完整的流。 它使用 refCount() 运算符使 ReplaySubject 多播兼容。这会导致并发订阅接收相同的排放。

您的示例包含一些问题,混淆了它们如何协同工作。见以下修改后的sn-p:

var state = 5
var realSource = Rx.Observable.create(observer =>  
  console.log("creating expensive HTTP-based emission"); 
  observer.next(state++);
//  observer.complete();
  
  return () => 
    console.log('unsubscribing from source')
  
);


var source = Rx.Observable.of('')
  .do(() => console.log('stream subscribed'))
  .ignoreElements()
  .concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
    
subscription1 = source.subscribe(next: (v) => console.log('observerA: ' + v));
subscription1.unsubscribe();
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
    
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
    
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

当运行这个 sn-p 时,我们可以清楚地看到它没有为 Observer D 发出重复的值,它实际上是为每个订阅创建新的发射。怎么会?

每次订阅都会在下一次订阅发生之前取消订阅。这有效地使 refCount 减少回零,没有进行多播。

问题在于realSource 流未完成。因为我们没有多播,所以下一个订阅者通过 ReplaySubject 获得了一个新的 realSource 实例,并且新的排放量与之前已经排放的排放量相加。

因此,要修复您的流多次调用昂贵的 HTTP 请求,您必须完成该流,以便 publishReplay 知道它不需要重新订阅。

【讨论】:

【参考方案2】:

通常:refCount 表示,只要至少有 1 个订阅者,流就是热/共享的 - 但是,当没有订阅者时,它会被重置/冷。

这意味着,如果您想绝对确定不会多次执行任何操作,则不应使用 refCount() 而只需使用 connect 将流设置为热。

附加说明:如果您在observer.next(5); 之后添加observer.complete(),您也将获得预期的结果。


旁注:您真的需要在这里创建自己的自定义Obervable 吗?在 95% 的情况下,现有运算符足以满足给定的用例。

【讨论】:

【参考方案3】:

发生这种情况是因为您使用的是publishReplay()。它在内部创建了一个 ReplaySubject 的实例,用于存储所有通过的值。

由于您使用Observable.create 发出单个值,因此每次调用source.subscribe(...) 时都会将一个值附加到ReplaySubject 中的缓冲区。

您不会在每行的开头打印call,因为它是ReplaySubject,在您订阅时首先发出其缓冲区,然后它自己订阅其源:

实现细节见:

https://github.com/ReactiveX/rxjs/blob/master/src/operator/multicast.ts#L63

https://github.com/ReactiveX/rxjs/blob/master/src/ReplaySubject.ts#L54

使用publishReplay(1) 时也是如此。首先它从ReplaySubject 发出缓冲项,然后从observer.next(5); 发出另一个项

【讨论】:

唯一相关的答案

以上是关于rxjs 5 发布重播引用计数的主要内容,如果未能解决你的问题,请参考以下文章

python的计数引用分析

Linux内核(linux-5.2.9)--内核对象(对象的引用计数)

自动引用计数 (ARC) 中的 NSThreads

引用计数

第29条:理解引用计数

初步了解JVM