javascript 支持WriteStream的RxJS转换流
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript 支持WriteStream的RxJS转换流相关的知识,希望对你有一定的参考价值。
const fs = require('fs')
const { map, take, tap } = require('rxjs/operators')
const { Subject } = require('rxjs')
const { Transform } = require('stream')
const createStreamObservable = (
readStream,
) => {
const chunk$ = new Subject()
const stream$ = new Subject()
const pullNextChunk = (
value,
) => {
chunk$
.next(value)
}
const transformStream = (
new Transform({
readableObjectMode: true,
transform(
chunk,
encoding,
callback,
) {
chunk$
.pipe(
take(1),
)
.subscribe(value => {
this
.push(value)
callback()
})
stream$
.next(chunk)
},
writableObjectMode: true,
})
)
const transformedReadStream = (
readStream
.pipe(transformStream)
.on(
'finish',
() => {
chunk$
.complete()
stream$
.complete()
}
)
)
return {
pullNextChunk,
stream$,
transformedReadStream, // Give access to the Node.js stream in case more native piping is required
}
}
const readStream = (
fs
.createReadStream('./novel.txt')
)
const writeStream = (
fs
.createWriteStream('./uppercaseNovel.txt')
)
const {
pullNextChunk,
stream$,
transformedReadStream,
} = createStreamObservable(readStream)
// Pipe to a `writeStream` or stream$ will never complete
transformedReadStream
.pipe(writeStream)
stream$
.pipe(
map(chunk => (
chunk
.toString()
)),
tap(() => {
console
.info('INCOMING CHUNK')
}),
map(stringifiedChunk => (
stringifiedChunk
.toUpperCase()
)),
tap(() => {
console
.info('DONE PROCESSING CHUNK')
}),
)
.subscribe({
complete: () => {
console
.info('DONE PROCESSING ALL CHUNKS')
},
next: pullNextChunk,
})
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS
// uppercaseNovel.txt
// ... (lots of text) lorem ipsum
// ... (lots of uppercase text) LOREM IPSUM
以上是关于javascript 支持WriteStream的RxJS转换流的主要内容,如果未能解决你的问题,请参考以下文章
Spark结构化流多个WriteStream到同一个接收器
如何在单个 Spark 作业中调用多个 writeStream 操作?
我可以将 writeStream.bytesWritten 与管道一起使用吗?
是否可以通过 spark 直接将 Writestream 用于 API
writestream 完成时如何返回承诺? [复制]
如何在 writeStream 中访问数组类型中的元素?