RxJava2 发布

Posted

技术标签:

【中文标题】RxJava2 发布【英文标题】:RxJava2 Publish 【发布时间】:2018-02-06 09:44:24 【问题描述】:

有什么区别

ObservableTransformer 
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )

ObservableTransformer 
    it.publish shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    

当我使用这两个运行我的代码时,我得到了相同的结果。发布在这里做什么。

【问题讨论】:

【参考方案1】:

不同之处在于***转换器将订阅上游两次,从下游一次订阅,复制上游的任何通常不想要的副作用:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

将打印

Subscribed!
2
3
4
Subscribed!
A
B
C

这里表示的副作用是打印输出Subscribed! 根据实际源中的实际工作,这可能意味着发送两次电子邮件,两次检索表的行。通过这个特定的示例,您可以看到,即使源值在它们的类型中交错,输出也会单独包含它们。

相比之下,publish(Function) 将为每个最终订阅者建立一个源订阅,因此源的任何副作用只发生一次。

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

打印出来的

Subscribed!
A
2
B
3
C
4

因为源订阅一次,每个项目都被多播到.ofType().compose() 的两个“臂”。

【讨论】:

我一直在使用 Jake Wharton 演讲“管理状态”中的这个,直到现在我才知道它的作用。这么简洁明了,非常感谢。【参考方案2】:

publish 运算符将您的 Observable 转换为 Connectable Observable

让我们看看Connectable Observable 是什么意思:假设您想要多次订阅 observable 并且想要为每个订阅者提供相同的项目。您需要使用Connectable Observable

示例:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : 0", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : 0", i));

输出:

first subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2

在这种情况下,我们可以在第一个项目发布之前快速订阅,但仅限于第一个订阅。第二次订阅晚了,错过了第一次发布。

我们可以移动 Connect() 方法的调用,直到完成所有订阅。这样,即使调用了 Thread.Sleep,我们也不会真正订阅底层证券,直到两个订阅都完成之后。这将按如下方式完成:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : 0", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : 0", i));
observable.Connect();

输出:

first subscription : 0 
second subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

所以使用 Completable Observable,我们可以控制何时让 Observable 发射项目。

示例来自:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

编辑 根据this link中的第180张幻灯片:

发布的另一个性质是,如果任何观察者在 10 秒后开始观察 observable 开始发射项目,则观察者只会获得 10 秒后(订阅时)发出的项目,而不是所有项目。所以在侧面,我可以理解发布正在用于 UI 事件。任何观察者都应该只接收在它订阅之后执行的那些事件,而不是之前发生的所有事件,这是完全有道理的。

希望对你有帮助。

【讨论】:

感谢@chandil03。是的,我知道发布的想法。但是看看幻灯片 180 上的这个link。为什么需要在调用Observable.merge() 之前发布。 @NovoDimaporo 请检查编辑,如果您有任何疑问,请告诉我。 好吧,如果你能看到整个代码,那么只有 1 个订阅发生,没有进一步的订阅。正如您在 observableTransformer 中看到的那样,已发布的 observable 实际上并没有显示在外部,除了在 'Observable.merge()' 上,那么它的用途是什么? @NovoDimaporo 你为什么要拿简单的案子?简单案例通常不会描述真实的用例。如果您需要使用两个订户怎么办。如果您希望您的观察者在使用connect() 运算符的一组操作后发出项目怎么办。如果您没有任何描述的用例,那么是的,这里没有使用publish 运算符。 @chandil03 “Hohenheim”所说的是发布操作符,它接受一个函数作为参数(这返回一个 Observable 而不是一个 ConnectableObservable),而不是没有参数的 publish() 操作符。

以上是关于RxJava2 发布的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2系列:rxjava2简答使用

RXJava2 管理订阅

RxJava2.0学习笔记2 2018年3月29日 星期四

Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

RxJava2:如何改进并行数据下载和缓存?

带有 rxJava2 和改造的 UndeliverableException