RxJS 订阅永远不会结束

Posted

技术标签:

【中文标题】RxJS 订阅永远不会结束【英文标题】:RxJS subscribe never finishes 【发布时间】:2017-11-05 02:47:30 【问题描述】:

我对 rxjs 还是很陌生。我在下面调用一个函数,读取完整的流并打印读取的控制台语句,但我从未看到“订阅完成”,我不知道为什么。完成这个流需要什么?有什么明显的问题吗?

const readline$ = RxNode.fromReadLineStream(rl)
    .filter((element, index, observable) => 
        if (index >= range.start && index < range.stop) 
            console.log(`kept line is $JSON.stringify(element)`);
            return true;
         else 
            console.log(`not keeping line $JSON.stringify(element)`);
            return false;
        
    )
    .concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
    .do(response => console.log(JSON.stringify(response)));

readline$.subscribe(i =>  console.log(`Subscribe object: $util.inspect(i)`); ,
                  err =>  console.error(`Subscribe error: $util.inspect(err)`); ,
                 done =>  console.log("Subscribe done."); // NEVER CALLED
                           anotherFunc();                  // NEVER CALLED
                     
);

【问题讨论】:

我不确定这个特定的 observable,但不是每个 observable 都会完成。 你传递给RxNode.fromReadLineStream的节点流是什么?流本身会结束吗? 是的,它是一个有限长度文件的 RxNode.fromReadLineStream。 最终使用 readline.on 关闭事件来实现几乎相同的效果。 【参考方案1】:

从源代码中可以看出,只有在源流发出close 事件时才会发送完成通知。 https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102

因此,如果您需要调用正确的完整处理程序,则需要自己关闭流,请参阅How to close a readable stream (before end)?。 换句话说,Observable 在读取整个文件后不会自动完成。

【讨论】:

以上是关于RxJS 订阅永远不会结束的主要内容,如果未能解决你的问题,请参考以下文章

AngularJS / rxjs:订阅时观察不到错误

rxjs - 主题订阅者错过了一个值

Apollo 订阅解析器永远不会激活?

取消订阅 Rxjs finalize 操作符

RxJS:取消订阅嵌套订阅

ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery