Rxjs【take, first, takeUntil, concatAll】

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjs【take, first, takeUntil, concatAll】相关的知识,希望对你有一定的参考价值。

参考技术A 这篇文章是我的Angular Rxjs Series中的第篇三文章,在继续阅读本文之前,您至少应该熟悉系列中的第一篇基础文章:

Rxjs6都改变了些什么?

Rxjs【Observable】

Marble Diagrams【宝珠图】

采取(1)与首先()

【中文标题】采取(1)与首先()【英文标题】:take(1) vs first() 【发布时间】:2017-07-09 19:34:10 【问题描述】:

我发现了一些使用take(1)AuthGuards 实现。在我的项目中,我使用了first()

两者的工作方式相同吗?

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/first';
import  Observable  from 'rxjs/Observable';

import  Injectable  from '@angular/core';
import  CanActivate, Router, ActivatedRouteSnapshot, RouterStateSnapshot  from '@angular/router';
import  AngularFire  from 'angularfire2';

@Injectable()
export class AuthGuard implements CanActivate 

    constructor(private angularFire: AngularFire, private router: Router)  

    canActivate(route: ActivatedRouteSnapshot, state: RouterStateSnapshot): Observable<boolean> | boolean 
        return this.angularFire.auth.map(
            (auth) =>  
                if (auth) 
                    this.router.navigate(['/dashboard']);
                    return false;
                 else 
                    return true;
                
            
        ).first(); // Just change this to .take(1)
    

【问题讨论】:

【参考方案1】:

事实证明,这两种方法之间有一个非常重要的区别:如果流在发出值之前完成,first() 将发出错误。或者,如果您提供了谓词 (i.e. first(value =&gt; value === 'foo')),如果流在发出通过谓词的值之前完成,它将发出错误。

另一方面,

take(1) 如果一个值从未从流中发出,则会很高兴地继续进行。这是一个简单的例子:

const subject$ = new Subject();

// logs "no elements in sequence" when the subject completes
subject$.first().subscribe(null, (err) => console.log(err.message));

// never does anything
subject$.take(1).subscribe(console.log);

subject$.complete();

另一个例子,使用谓词:

const observable$ = of(1, 2, 3);

// logs "no elements in sequence" when the observable completes
observable$
 .first((value) => value > 5)
 .subscribe(null, (err) => console.log(err.message));

// the above can also be written like this, and will never do
// anything because the filter predicate will never return true
observable$
 .filter((value) => value > 5);
 .take(1)
 .subscribe(console.log);

作为 RxJS 的新手,这种行为让我非常困惑,尽管这是我自己的错,因为我做了一些不正确的假设。如果我费心检查文档,我会看到行为是clearly documented:

如果未提供 defaultValue 且未找到匹配元素,则会引发错误。

我经常遇到这种情况的原因是一个相当常见的 Angular 2 模式,在 OnDestroy 生命周期挂钩期间手动清理可观察对象:

class MyComponent implements OnInit, OnDestroy 
  private stream$: Subject = someDelayedStream();
  private destroy$ = new Subject();

  ngOnInit() 
    this.stream$
      .takeUntil(this.destroy$)
      .first()
      .subscribe(doSomething);
  

  ngOnDestroy() 
    this.destroy$.next(true);
  

代码起初看起来是无害的,但是当在 stream$ 之前被销毁的组件可以发出值时,就会出现问题。因为我使用的是first(),所以在销毁组件时会抛出错误。我通常只订阅流以获取要在组件中使用的值,因此我不在乎组件是否在流发出之前被销毁。正因为如此,我已经开始在几乎所有以前使用first() 的地方使用take(1)

filter(fn).take(1)first(fn) 更冗长一些,但在大多数情况下,我更喜欢处理一些最终对应用程序没有影响的错误。

同样需要注意:last()takeLast(1) 也是如此。

reference

【讨论】:

【参考方案2】:

运营商 first()take(1) 不一样。

first() 运算符采用可选的predicate 函数,并在源完成时没有匹配的值时发出error 通知。

例如,这将发出错误:

import  EMPTY, range  from 'rxjs';
import  first, take  from 'rxjs/operators';

EMPTY.pipe(
  first(),
).subscribe(console.log, err => console.log('Error', err));

...还有这个:

range(1, 5).pipe(
  first(val => val > 6),
).subscribe(console.log, err => console.log('Error', err));

虽然这将匹配发出的第一个值:

range(1, 5).pipe(
  first(),
).subscribe(console.log, err => console.log('Error', err));

另一方面,take(1) 只取第一个值并完成。不涉及进一步的逻辑。

range(1, 5).pipe(
  take(1),
).subscribe(console.log, err => console.log('Error', err));

然后使用空源 Observable 就不会发出任何错误:

EMPTY.pipe(
  take(1),
).subscribe(console.log, err => console.log('Error', err));

2019 年 1 月:针对 RxJS 6 更新

【讨论】:

请注意,我并没有说first()take() 通常相同,我认为这很明显,只是first()take(1) 相同。我不确定您的回答是否仍然存在差异? @GünterZöchbauer 实际上,他们的行为是不同的。如果源没有发出任何东西并完成,那么first() 发送错误通知,而take(1) 根本不会发出任何东西。 @martin,在某些情况下 take(1) 不会发出任何表示调试代码会更难的意思? @Karuban 这真的取决于你的用例。如果没有收到任何值是出乎意料的,我建议使用first()。如果这是一个有效的应用程序状态,我会选择take(1) 这类似于 .NET 的 .First().FirstOrDefault() (想想它也是 .Take(1) ,因为 First 需要集合中的某些内容并为空集合提供错误 - 和FirstOrDefault().Take(1) 都允许集合为空,并分别返回 null 和空集合。【参考方案3】:

这里有三个 Observable ABC 用大理石图来探索 firsttakesingle 运算符之间的区别:

* 图例--o-- ----! 错误----| 完成

https://thinkrx.io/rxjs/first-vs-take-vs-single/ 玩它。

已经有了所有答案,我想添加一个更直观的解释

希望对某人有所帮助

【讨论】:

first() 在收到第一个值后是否完成? @FernandoGabrieli,是的!它在发出第一个值后立即完成。在可视化中,(0) 后面的 | 表明了这一点。更多详情thinkrx.io/rxjs/first【参考方案4】:

提示:仅在以下情况下使用first()

您认为发出零个项目是一种错误情况(例如,在发出之前完成)并且 如果错误的可能性大于 0%,您可以优雅地处理它您知道 100% 的源 observable 将发出 1+ 个项目(因此永远不能抛出)

如果排放为零并且您没有明确处理它(使用catchError),那么该错误将被向上传播,可能会导致其他地方出现意外问题并且可能很难追踪 - 特别是如果它来自最终用户。

如果您在大多数情况下使用take(1),则更安全

如果源在没有发射的情况下完成,take(1) 不会发射任何东西是可以的。 您不需要使用内联谓词(例如first(x =&gt; x &gt; 10)

注意:您可以将谓词与take(1) 一起使用,如下所示:.pipe( filter(x =&gt; x &gt; 10), take(1) )。如果没有任何东西大于 10,这没有错误。

single()呢?

如果您想更严格,并且不允许两次排放,您可以使用single(),如果有零或 2+ 排放,则会出错。在这种情况下,您需要再次处理错误。

提示:Single 有时会很有用,如果你想确保你的 observable 链不会做额外的工作,比如调用一个 http 服务两次并发出两个 observables。将single 添加到管道的末尾会让您知道您是否犯了这样的错误。我在“任务运行器”中使用它,您在其中传递一个只应发出一个值的可观察任务,因此我通过single(), catchError() 传递响应以保证良好的行为。


为什么不总是使用first() 而不是take(1)

又名。 first可能如何导致更多错误?

如果你有一个 observable 从服务中获取一些东西,然后通过first() 管道它,你应该没问题。但是,如果有人出于某种原因禁用该服务 - 并将其更改为发出 of(null)NEVER ,那么任何下游 first() 操作员都会开始抛出错误。

现在我意识到这可能正是您想要的 - 因此这只是一个提示。运营商first 吸引了我,因为它听起来比take(1) 稍微不那么“笨拙”,但如果有可能不发出源,你需要小心处理错误。不过这完全取决于你在做什么。


如果有默认值(常量):

如果你有一个默认值,如果没有发出任何内容,也可以考虑.pipe(defaultIfEmpty(42), first())。这当然不会引发错误,因为first 总是会收到一个值。

请注意,defaultIfEmpty 仅在流为空时触发,而不是在发出的值为 null 时触发。

【讨论】:

请注意singlefirst 有更多不同。 1. 它只会在complete 上发出值。这意味着如果 observable 发出一个值但从未完成,那么 single 将永远不会发出一个值。 2. 出于某种原因,如果您将过滤器函数传递给不匹配任何内容的 single,如果原始序列不为空,它将发出 undefined 值,@987654352 不是这种情况@. 关于 2. 这是一个 bug 现在已修复。【参考方案5】:

似乎在 RxJS 5.2.0 中,.first() 运算符有一个 bug,

由于该错误,.take(1).first() 如果您将它们与 switchMap 一起使用,它们的行为可能会完全不同:

使用take(1),您将获得预期的行为:

var x = Rx.Observable.interval(1000)
   .do( x=> console.log("One"))
   .take(1)
   .switchMap(x => Rx.Observable.interval(1000))
   .do( x=> console.log("Two"))
   .subscribe((x) => )

// In the console you will see:
// One
// Two
// Two
// Two
// Two
// etc...

但是使用.first() 你会得到错误的行为:

var x = Rx.Observable.interval(1000)
  .do( x=> console.log("One"))
  .first()
  .switchMap(x => Rx.Observable.interval(1000))
  .do( x=> console.log("Two"))
  .subscribe((x) => )

// In console you will see:
// One
// One
// Two
// One
// Two
// One
// etc... 

这是codepen的链接

【讨论】:

【参考方案6】:

有一个非常重要的区别在任何地方都没有提到。

take(1) 发出 1,完成,取消订阅

first() 发出 1,完成,但不取消订阅。

这意味着您的上游可观察对象在 first() 之后仍然很热,这可能不是预期的行为。

UPD:这是指 RxJS 5.2.0。这个问题可能已经解决了。

【讨论】:

我认为没有人退订,请参阅jsbin.com/nuzulorota/1/edit?js,console。 是的,两个算子都完成了订阅,区别在于错误处理。如果该 observable 没有发出值并且仍然尝试使用第一个运算符获取第一个值,它将引发错误。如果我们用 take(1) 操作符替换它,即使订阅发生时流中没有值,它也不会抛出错误。 澄清一下:两者都取消订阅。 @weltschmerz 的示例过于简化,它不会运行,直到它可以自行取消订阅。这个扩展了一点:repl.it/repls/FrayedHugeAudacity

以上是关于Rxjs【take, first, takeUntil, concatAll】的主要内容,如果未能解决你的问题,请参考以下文章

使用 rxjs take() 和 Angular 的 http [重复]

RxJS Subject学习

当我在 Windows 7 中运行“first”或“take”方法时 pyspark 崩溃

任何需要先取消订阅 RxJS()

Rxjs中Notification 介绍

rxjs——subject和Observable的区别