RxJS 订阅永远不会结束
Posted
技术标签:
【中文标题】RxJS 订阅永远不会结束【英文标题】:RxJS subscribe never finishes 【发布时间】:2017-11-05 02:47:30 【问题描述】:我对 rxjs 还是很陌生。我在下面调用一个函数,读取完整的流并打印读取的控制台语句,但我从未看到“订阅完成”,我不知道为什么。完成这个流需要什么?有什么明显的问题吗?
const readline$ = RxNode.fromReadLineStream(rl)
.filter((element, index, observable) =>
if (index >= range.start && index < range.stop)
console.log(`kept line is $JSON.stringify(element)`);
return true;
else
console.log(`not keeping line $JSON.stringify(element)`);
return false;
)
.concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
.do(response => console.log(JSON.stringify(response)));
readline$.subscribe(i => console.log(`Subscribe object: $util.inspect(i)`); ,
err => console.error(`Subscribe error: $util.inspect(err)`); ,
done => console.log("Subscribe done."); // NEVER CALLED
anotherFunc(); // NEVER CALLED
);
【问题讨论】:
我不确定这个特定的 observable,但不是每个 observable 都会完成。 你传递给RxNode.fromReadLineStream
的节点流是什么?流本身会结束吗?
是的,它是一个有限长度文件的 RxNode.fromReadLineStream。
最终使用 readline.on 关闭事件来实现几乎相同的效果。
【参考方案1】:
从源代码中可以看出,只有在源流发出close
事件时才会发送完成通知。 https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102
因此,如果您需要调用正确的完整处理程序,则需要自己关闭流,请参阅How to close a readable stream (before end)?。 换句话说,Observable 在读取整个文件后不会自动完成。
【讨论】:
以上是关于RxJS 订阅永远不会结束的主要内容,如果未能解决你的问题,请参考以下文章
ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery