javascript RxJS转换流使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript RxJS转换流使用相关的知识,希望对你有一定的参考价值。

const fs = require('fs')
const { delay, map, tap } = require('rxjs/operators')

const createTransformStreamObservable = require('./createTransformStreamObservable')

const readStream = (
  fs
  .createReadStream('./novel.txt')
)

const writeStream = (
  fs
  .createWriteStream('./uppercaseNovel.txt')
)

const transformStream$ = (
  createTransformStreamObservable(
    readStream,
  )
)

// Pipe to a `writeStream` or stream$ will never complete
transformStream$
.stream
.pipe(writeStream)

transformStream$
.pipe(
  tap(() => {
    console
    .info('INCOMING CHUNK')
  }),
  delay(1000), // This could be an AJAX call
  map(chunk => (
    chunk
    .toString()
    .toUpperCase()
  )),
  tap(() => {
    console
    .info('DONE PROCESSING CHUNK')
  }),
  tap(
    transformStream$
    .push
  ),
)
.subscribe({
  complete: () => {
    console
    .info('DONE PROCESSING ALL CHUNKS')
  },
  next: (
    transformStream$
    .next
  ),
})

// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS

以上是关于javascript RxJS转换流使用的主要内容,如果未能解决你的问题,请参考以下文章

javascript RxJS变换流可观察

nodejs中的RXJS PostgreSQL背压

rxjs简单入门

响应式编程实战——RxJS 手动停止事件流的正确方式

作为前端,你需要知道 RxJS(响应式编程-流)

rxjs简单入门