如何延迟创建可观察对象

Posted

技术标签:

【中文标题】如何延迟创建可观察对象【英文标题】:How can I create an observable with a delay 【发布时间】:2017-07-13 19:54:10 【问题描述】:

问题

出于测试目的,我正在创建 Observable 对象,这些对象将实际 http 调用返回的 observable 替换为 Http

我的 observable 是使用以下代码创建的:

fakeObservable = Observable.create(obs => 
  obs.next([1, 2, 3]);
  obs.complete();
);

问题是,这个 observable 立即发出。有没有办法为其发射添加自定义延迟?


跟踪

我试过了:

fakeObservable = Observable.create(obs => 
  setTimeout(() => 
    obs.next([1, 2, 3]);
    obs.complete();
  , 100);
);

但它似乎不起作用。

【问题讨论】:

github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/… 我尝试将.create(...).delay(1000) 链接起来,但没有成功:Observable_1.Observable.create(...).delay 不是函数。 你到底想完成什么? 你订阅 observable 了吗? 用我自己的 observable 伪造 Http 响应延迟。 @shusson 是的,我正在测试的课程正在为 observable 调用服务(我正在尝试模拟)以便订阅它。 【参考方案1】:

使用以下导入:

import Observable from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/delay';

试试这个:

let fakeResponse = [1,2,3];
let delayedObservable = Observable.of(fakeResponse).delay(5000);
delayedObservable.subscribe(data => console.log(data));

更新:RXJS 6

上述解决方案在较新版本的 RXJS(例如 angular)中不再适用。

所以场景是我有一组项目要使用 API 检查。 API 只接受一个项目,我不想通过一次发送所有请求来终止 API。所以我需要在 Observable 流上定时发布项目,中间有一点延迟。

使用以下导入:

import  from, of  from 'rxjs';
import  delay  from 'rxjs/internal/operators';
import  concatMap  from 'rxjs/internal/operators';

然后使用以下代码:

const myArray = [1,2,3,4];

from(myArray).pipe(
        concatMap( item => of(item).pipe ( delay( 1000 ) ))
    ).subscribe ( timedItem => 
        console.log(timedItem)
    );

它基本上为数组中的每个项目创建一个新的“延迟”Observable。可能还有很多其他方法可以做到这一点,但这对我来说效果很好,并且符合“新”RXJS 格式。

【讨论】:

“typeof Observable”类型上不存在“of”属性。您是否使用 import Observable from 'rxjs/Observable'; 导入您的 Observable? 从此页面:npmjs.com/package/rxjs。我推断我必须使用import 'rxjs/add/observable/of'; 显式导入。你碰巧做同样的事情吗?但这仍然很奇怪,因为它不会与 .delay(...) 链接,并且当我尝试 rxjs/add/observable/delay...时会显示错误... 应该 of(item.pipe ( delay( 1000 ) ))of(item))).pipe(delay(1000) 尝试管道阵列给了我错误 这对我使用 rxjs6 有用: from([1, 2, 3, 4, 5, 6, 7]).pipe(concatMap(num => of(num).pipe(延迟(1000)))) .subscribe(x => console.log(x)); @MikeOne 的解决方案也对我有用。很遗憾,这么简单的事情需要这么多代码......【参考方案2】:

在 RxJS 5+ 中你可以这样做

import  Observable  from "rxjs/Observable";
import  of  from "rxjs/observable/of";
import  delay  from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

在 RxJS 6+ 中

import  of  from "rxjs";
import  delay  from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

如果你想延迟每个发出的值试试

from([1, 2, 3]).pipe(concatMap(item => of(item).pipe(delay(1000))));

【讨论】:

我认为最干净的解决方案。 这个“解决方案”只有在你发出一个项目时才有效。不会为 observable 中的每个元素调用延迟运算符。这就是为什么需要可怕的 concatMap 解决方案。 @RickO'Shea,问题是关于一个发射值,所以这就是这个解决方案的原因。 好清新好干净! 像魅力一样工作!【参考方案3】:

你想要的是一个计时器:

// RxJS v6+
import  timer  from 'rxjs';

//emit [1, 2, 3] after 1 second.
const source = timer(1000).map(([1, 2, 3]);
//output: [1, 2, 3]
const subscribe = source.subscribe(val => console.log(val));

【讨论】:

好答案,别忘了退订【参考方案4】:

现在回答有点晚了......但以防万一有人回到这个问题寻找答案

'delay' 是 Observable 的属性(函数)

fakeObservable = Observable.create(obs => 
  obs.next([1, 2, 3]);
  obs.complete();
).delay(3000);

这对我有用...

【讨论】:

import 'rxjs/add/operator/delay' 现在给出此错误:找不到模块:错误:无法解析 'rxjs/add/operator/delay' 当它非常真实时,为什么你会称你为可观察的假货? :)【参考方案5】:

import * as Rx from 'rxjs/Rx';

我们应该添加上面的导入以使打击代码工作

Let obs = Rx.Observable
    .interval(1000).take(3);

obs.subscribe(value => console.log('Subscriber: ' + value));

【讨论】:

import * 对于这个简单的任务来说有点矫枉过正

以上是关于如何延迟创建可观察对象的主要内容,如果未能解决你的问题,请参考以下文章

我如何从可观察对象创建对象,该对象返回对象数组

如何根据从另一个可观察对象返回的数组中的键创建可观察/ http请求数组

如何在核心数据上创建可观察对象并将其绑定到 tableView

对有延迟的 Rxjava 可观察对象进行单元测试

RxJs:根据字段值更改创建可观察对象

如何基于涉及第一个元素的谓词转换可观察对象