如何从无限流中创建Observable

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何从无限流中创建Observable相关的知识,希望对你有一定的参考价值。

我正在尝试将RxJava从1升级到2。在我的旧代码中,我有如下方法:

private Observable<Integer> reversRange(int from, int to) { Stream<Integer> intStream = Stream.iterate(to, p -> p - 1); return Observable.from(() -> intStream.iterator()) .takeWhile(n -> n > from) .map(n -> n ); }

但是现在在RxJava 2中,我不能使用from。这等价于什么代码?我在this question中发现它是fromIterable,但我不知道如何在Stream中使用它。

或其他示例,这不仅应用于范围,而且应用于任何无限流。

private Observable<Integer> numbers() { Stream<Integer> intStream = Stream.iterate(0, p -> p + 1); return Observable.from(() -> intStream.iterator()); }

答案
如果您只有StreamInteger,则只需执行以下操作:
另一答案
使用Observable.generate(() -> from, (value, emitter) -> { emitter.onNext(value); return value + 1; }); 功能:

以上是关于如何从无限流中创建Observable的主要内容,如果未能解决你的问题,请参考以下文章

从回调中创建一个 Observable

从数据数组中创建observable并将所有这些链接在一起

如何在颤动中创建无限网格?

如何将 observable 的值从服务传递到组件以更新模板

如何在Angular 2中创建RxJS主题?

如何从递归生成值的流中创建 akka-stream 源?