[RxJS] Reusable multicasting with Subject factories

Posted Answer1215

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RxJS] Reusable multicasting with Subject factories相关的知识,希望对你有一定的参考价值。

The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

 

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(new Rx.Subject())
  .refCount();

The code above, after subject emit 0,1,2, three values, then it completes. It means if you want to subscribe the subject again, it won‘t emit anything because it is completed. 

 

If you want to reuse the ‘shared‘ subject even after subject complete, you need to use subject factories, which simply just a function return new Subject():

function subjectFactory() {
  return new Rx.Subject(); 
}

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(subjectFactory)
  .refCount();

 

So now even you resubscribe after subject complete, it will emit you new value.

function subjectFactory() {
  return new Rx.Subject(); 
}

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(subjectFactory)
  .refCount();

// subject: --0--1--2--3--4--5|
//                               A
// subject2:                     --0--1--2--3--4--5|

var observerA = {
  next: function (x) { console.log(A next  + x); },
  error: function (err) { console.log(A error  + err); },
  complete: function () { console.log(A done); },
};

var subA = shared.subscribe(observerA); // 0 => 1
console.log(subscribed A);

var observerB = {
  next: function (x) { console.log(B next  + x); },
  error: function (err) { console.log(B error  + err); },
  complete: function () { console.log(B done); },
};

var subB;
setTimeout(function () {
  subB = shared.subscribe(observerB);
  console.log(subscribed B);
}, 2000);

setTimeout(function () {
  subA.unsubscribe();
  console.log(unsubscribed A);
}, 3000);

setTimeout(function () {
  subB.unsubscribe();
  console.log(unsubscribed B);
}, 5000);

setTimeout(function () {
  subA = shared.subscribe(observerA); // 0 => 1 (connect)
  console.log(subscribed A);
}, 6000);

 

/**
"subscribed A"
"source 0"
"A next 0"
"source 1"
"A next 1"
"subscribed B"
"source 2"
"A next 2"
"B next 2"
"A done"
"B done"
"unsubscribed A"
"unsubscribed B"
"subscribed A"
"source 0"
"A next 0"
"source 1"
"A next 1"
"source 2"
"A next 2"
"A done"

*/

 

以上是关于[RxJS] Reusable multicasting with Subject factories的主要内容,如果未能解决你的问题,请参考以下文章

[RxJS] Multicasting shortcuts: publish() and variants

[RxJS] Multicast with a selector argument, as a sandbox

[TypeStyle] Reusable styles using TypeStyle mixins

通过hadoop配置文件快速构建可reusable的Hadoop fs

UITableViewCell (Reusable Cell) 没有识别正确的约束

Working With Playbooks--Creating Reusable Playbooks