javascript RxJS变换流可观察

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript RxJS变换流可观察相关的知识,希望对你有一定的参考价值。

const { fromEvent, Subject } = require('rxjs')
const { take } = require('rxjs/operators')
const { Transform } = require('stream')

const createTransformStreamSubject = (
  nodeJsStream,
) => {
  const chunk$ = new Subject()
  const push$ = new Subject()
  const transformStream$ = new Subject()

  transformStream$
  ._next = (
    transformStream$
    .next
    .bind(transformStream$)
  )

  const transformStream = (
    new Transform({
      readableObjectMode: true,

      transform(
        chunk,
        encoding,
        callback,
      ) {
        chunk$
        .pipe(
          take(1),
        )
        .subscribe(callback)

        transformStream$
        ._next(chunk)
      },

      writableObjectMode: true,
    })
  )

  push$
  .subscribe(value => {
    transformStream
    .push(value)
  })

  const transformedStream = (
    nodeJsStream
    .pipe(transformStream)
  )

  transformStream$
  .stream = transformedStream

  fromEvent(
    transformedStream,
    'finish',
  )
  .subscribe(() => {
    chunk$
    .complete()

    transformStream$
    .complete()

    push$
    .complete()
  })

  transformStream$
  .push = (
    value,
  ) => {
    push$
    .next(value)
  }

  transformStream$
  .next = () => {
    chunk$
    .next()
  }

  return transformStream$
}

module.exports = createTransformStreamSubject

以上是关于javascript RxJS变换流可观察的主要内容,如果未能解决你的问题,请参考以下文章

rxjs简单入门

Angular18 RXJS

rxjs简单入门

rxjs入门指南

可观察的数组到数组(Rxjs)

相当于 rxjs 中可观察到的 redux 'getState'