javascript RxJS变换流可观察
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javascript RxJS变换流可观察相关的知识,希望对你有一定的参考价值。
const { fromEvent, Subject } = require('rxjs')
const { take } = require('rxjs/operators')
const { Transform } = require('stream')
const createTransformStreamSubject = (
nodeJsStream,
) => {
const chunk$ = new Subject()
const push$ = new Subject()
const transformStream$ = new Subject()
transformStream$
._next = (
transformStream$
.next
.bind(transformStream$)
)
const transformStream = (
new Transform({
readableObjectMode: true,
transform(
chunk,
encoding,
callback,
) {
chunk$
.pipe(
take(1),
)
.subscribe(callback)
transformStream$
._next(chunk)
},
writableObjectMode: true,
})
)
push$
.subscribe(value => {
transformStream
.push(value)
})
const transformedStream = (
nodeJsStream
.pipe(transformStream)
)
transformStream$
.stream = transformedStream
fromEvent(
transformedStream,
'finish',
)
.subscribe(() => {
chunk$
.complete()
transformStream$
.complete()
push$
.complete()
})
transformStream$
.push = (
value,
) => {
push$
.next(value)
}
transformStream$
.next = () => {
chunk$
.next()
}
return transformStream$
}
module.exports = createTransformStreamSubject
以上是关于javascript RxJS变换流可观察的主要内容,如果未能解决你的问题,请参考以下文章
rxjs简单入门
Angular18 RXJS
rxjs简单入门
rxjs入门指南
可观察的数组到数组(Rxjs)
相当于 rxjs 中可观察到的 redux 'getState'