javascript 支持WriteStream的RxJS转换流

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript 支持WriteStream的RxJS转换流相关的知识,希望对你有一定的参考价值。

const fs = require('fs')
const { map, take, tap } = require('rxjs/operators')
const { Subject } = require('rxjs')
const { Transform } = require('stream')

const createStreamObservable = (
  readStream,
) => {
  const chunk$ = new Subject()
  const stream$ = new Subject()

  const pullNextChunk = (
    value,
  ) => {
    chunk$
    .next(value)
  }

  const transformStream = (
    new Transform({
      readableObjectMode: true,

      transform(
        chunk,
        encoding,
        callback,
      ) {
        chunk$
        .pipe(
          take(1),
        )
        .subscribe(value => {
          this
          .push(value)

          callback()
        })

        stream$
        .next(chunk)
      },

      writableObjectMode: true,
    })
  )

  const transformedReadStream = (
    readStream
    .pipe(transformStream)
    .on(
      'finish',
      () => {
        chunk$
        .complete()

        stream$
        .complete()
      }
    )
  )

  return {
    pullNextChunk,
    stream$,
    transformedReadStream, // Give access to the Node.js stream in case more native piping is required
  }
}

const readStream = (
  fs
  .createReadStream('./novel.txt')
)

const writeStream = (
  fs
  .createWriteStream('./uppercaseNovel.txt')
)

const {
  pullNextChunk,
  stream$,
  transformedReadStream,
} = createStreamObservable(readStream)

// Pipe to a `writeStream` or stream$ will never complete
transformedReadStream
.pipe(writeStream)

stream$
.pipe(
  map(chunk => (
    chunk
    .toString()
  )),
  tap(() => {
    console
    .info('INCOMING CHUNK')
  }),
  map(stringifiedChunk => (
    stringifiedChunk
    .toUpperCase()
  )),
  tap(() => {
    console
    .info('DONE PROCESSING CHUNK')
  }),
)
.subscribe({
  complete: () => {
    console
    .info('DONE PROCESSING ALL CHUNKS')
  },
  next: pullNextChunk,
})

// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS

// uppercaseNovel.txt
// ... (lots of text) lorem ipsum
// ... (lots of uppercase text) LOREM IPSUM

以上是关于javascript 支持WriteStream的RxJS转换流的主要内容,如果未能解决你的问题,请参考以下文章

Spark结构化流多个WriteStream到同一个接收器

如何在单个 Spark 作业中调用多个 writeStream 操作?

我可以将 writeStream.bytesWritten 与管道一起使用吗?

是否可以通过 spark 直接将 Writestream 用于 API

writestream 完成时如何返回承诺? [复制]

如何在 writeStream 中访问数组类型中的元素?