[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through(代码片段

Posted Answer1215

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through(代码片段相关的知识,希望对你有一定的参考价值。

Understanding sources and subscribers makes it much easier to understand what‘s going on with mergeMap under the hood. Where a typical operator invokes destination.next directly, mergeMap wraps destination.next inside of a new source/subscriber combo so there‘s an "outer" next and an "inner" next.

 

import { fromEvent, of, Subscriber } from "rxjs"
import {
  scan,
  delay,
  mergeMap
} from "rxjs/operators"

class MyMergeMapSubscriber extends Subscriber {
  constructor(sub, fn) {
    super(sub)

    this.fn = fn
  }

  _next(value) {
    console.log(`outer`, value)
    const o$ = this.fn(value)

    o$.subscribe({
      next: value => {
        console.log(`  inner`, value)
        this.destination.next(value)
      }
    })
  }
}

const myMergeMap = fn => source =>
  source.lift({
    call(sub, source) {
      source.subscribe(
        new MyMergeMapSubscriber(sub, fn)
      )
    }
  })

const observable$ = fromEvent(
  document,
  "click"
).pipe(
  scan(i => i + 1, 0),
  myMergeMap(value => of(value).pipe(delay(500)))
)

const subscriber = {
  next: value => {
    console.log(value)
  },
  complete: () => {
    console.log("done")
  },
  error: value => {
    console.log(value)
  }
}

observable$.subscribe(subscriber)

 

以上是关于[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through(代码片段的主要内容,如果未能解决你的问题,请参考以下文章

[RxJS] Implement RxJS `switchMap` by Canceling Inner Subscriptions as Values are Passed Through(代码片段

学习RxJS: 导入

学习RxJS:Cycle.js

使用 RXJS 继续以角度触发 http 调用,直到满足条件

如何理解 RxJS?RxJS的中文API和使用教程

rxjs-tslint vs rxjs-tslint-rules 包