RxJS:我将如何“手动”更新 Observable?

Posted

技术标签:

【中文标题】RxJS:我将如何“手动”更新 Observable?【英文标题】:RxJS: How would I "manually" update an Observable? 【发布时间】:2016-01-24 06:46:06 【问题描述】:

我想我一定是误解了一些基本的东西,因为在我看来,这应该是 observable 的最基本情况,但对于我的生活,我无法从文档中弄清楚如何做到这一点。

基本上,我希望能够做到这一点:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer));
var observer = eventObservable.subscribe(
   function(x)
     console.log('next: ' + x);
   
...
var my_function = function()
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo

但是我一直找不到像push 这样的方法。我将它用于点击处理程序,我知道他们有Observable.fromEvent,但我正在尝试将它与 React 一起使用,我宁愿能够简单地在回调中更新数据流,而不是使用完全不同的事件处理系统。所以基本上我想要这个:

$( "#target" ).click(function(e) 
  eventObservable.push(e.target.text()); 
);

我得到的最接近的是使用observer.onNext('foo'),但这似乎并没有真正起作用,并且它被调用了观察者,这似乎不正确。观察者应该是对数据流做出反应的东西,而不是改变它,对吧?

我只是不了解观察者/可观察者的关系吗?

【问题讨论】:

看看这个来澄清你的想法(你一直缺少的反应式编程的介绍):gist.github.com/staltz/868e7e9bc2a7b8c1f754。这里也有很多资源可以提高你的理解:github.com/Reactive-Extensions/RxJS#resources 我检查了第一个,似乎是一个可靠的资源。第二个是一个很棒的列表,我在上面找到了aaronstacy.com/writings/reactive-programming-and-mvc,它帮助我发现了 Rx.Subject,它解决了我的问题。那谢谢啦!一旦我写了更多的应用程序,我会发布我的解决方案,只是想稍微测试一下。 呵呵,非常感谢你提出这个问题,我正准备用同样的代码示例问同样的问题:-) 【参考方案1】:

在 RX 中,Observer 和 Observable 是不同的实体。一个观察者订阅一个 Observable。 Observable 通过调用观察者的方法向其观察者发出项目。如果您需要在Observable.create() 范围之外调用观察者方法,您可以使用Subject,它是同时充当观察者和Observable 的代理。

你可以这样做:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) 
        console.log('Next: ' + x);
    ,
    function (err) 
        console.log('Error: ' + err);
    ,
    function () 
        console.log('Completed');
    );

var my_function = function() 
  eventStream.next('foo'); 

您可以在此处找到有关主题的更多信息:

https://github.com/ReactiveX/rxjs/blob/master/docs_app/content/guide/subject.md http://reactivex.io/documentation/subject.html

【讨论】:

这实际上正是我最终要做的!我一直在努力,看看是否能找到更好的方法来做我需要做的事情,但这绝对是一个可行的解决方案。我首先在这篇文章中看到它:aaronstacy.com/writings/reactive-programming-and-mvc. 如果他们不能使用 Subjects 并且必须使用 observables 怎么办? @IanSteffy 如果你创建一个新的 Observable,它是从旧的 Observable 和一个新的主题中创建的(通过合并),那么你可以使用这个主题来提供新的 Observable? @PEZO 这行得通。谢谢。【参考方案2】:

我相信Observable.create() 不会将 observer 作为回调参数,而是使用发射器。因此,如果您想为 Observable 添加新值,请尝试以下操作:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = 
  next: function(next) 
    console.log(next);
  ,
  error: function(error) 
    console.log(error);
  ,
  complete: function() 
    console.log("done");
  

observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

是的,Subject 更容易,在同一个对象中提供 Observable 和 Observer,但并不完全相同,因为当一个 observable 只向最后一个订阅的观察者发送数据时,Subject 允许您订阅多个观察者到同一个 observable,所以有意识地使用它。 如果你想修改它,这里是JsBin。

【讨论】:

RxJS 手册中是否记录了覆盖发射器属性的可能性? 在这种情况下,emitter 只会为最后订阅的观察者提供 next() 新值。更好的方法是将所有emitters 收集到一个数组中并遍历它们,next 对它们中的每一个进行迭代 用什么来代替不推荐使用的电话Observable.create()呢?我尝试了new Observable(emitter),但它的行为不像我预期的那样。 ***.com/q/65060800/958373

以上是关于RxJS:我将如何“手动”更新 Observable?的主要内容,如果未能解决你的问题,请参考以下文章

[RxJS] Error handling operator: catch

Angular RxJS入门笔记 (Observable可观察对象Subscribe订阅Observer观察者Subscription对象)

响应式编程实战——RxJS 手动停止事件流的正确方式

如何通过 redux-observable/rxjs 使用 Firestore 实时更新(onSnapshot)?

rxjs中如何做链序

如何在 Angular 4 中推送到数组的 Observable? RxJS