Android :RxJava学习笔记之SingleCompletable以及Maybe

Posted JMW1407

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android :RxJava学习笔记之SingleCompletable以及Maybe相关的知识,希望对你有一定的参考价值。

Single、Completable以及Maybe概述

类型描述
Observable < T >能够发射0或n个数据,并以成功或错误事件终止。
Flowable< T>能够发射0或n个数据,并以成功或错误事件终止。 支持Backpressure,可以控制数据源发射的速度。
Single< T>只发射单个数据或错误事件。
Completable它从来不发射数据,只处理 onComplete 和 onError 事件。可以看成是Rx的Runnable。
Maybe< T>能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional

1、Single

Android :RxJava学习笔记之Single

从SingleEmitter的源码可以看出,Single 只有 onSuccess 和 onError 事件。

/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;

/**
 * Abstraction over an RxJava {@link SingleObserver} that allows associating
 * a resource with it.
 * <p>
 * All methods are safe to call from multiple threads.
 * <p>
 * Calling onSuccess or onError multiple times has no effect.
 *
 * @param <T> the value type to emit
 */
public interface SingleEmitter<T> {

    /**
     * Signal a success value.
     * @param t the value, not null
     */
    void onSuccess(@NonNull T t);

    /**
     * Signal an exception.
     * @param t the exception, not null
     */
    void onError(@NonNull Throwable t);

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param s the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable s);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);

    /**
     * Returns true if the downstream cancelled the sequence.
     * @return true if the downstream cancelled the sequence
     */
    boolean isDisposed();
}

其中,onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据)。而且只能发射一个数据,后面即使再发射数据也不会做任何处理。

Single的SingleObserver中只有onSuccess、onError,并没有onComplete。这是 Single 跟其他四种被观察者最大的区别。

Single 可以通过toXXX方法转换成Observable、Flowable、Completable以及Maybe。

List<String> s = Arrays.asList("Java", "android", "Ruby", "ios", "Swift");
 Single
        .just(s)
        .toObservable()
        .flatMapIterable( list -> list)
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println(s);
          }
        });

2、Completable

Completable在创建后,不会发射任何数据。从CompletableEmitter的源码可以看到

/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;

/**
 * Abstraction over an RxJava {@link CompletableObserver} that allows associating
 * a resource with it.
 * <p>
 * All methods are safe to call from multiple threads.
 * <p>
 * Calling onComplete or onError multiple times has no effect.
 */
public interface CompletableEmitter {

    /**
     * Signal the completion.
     */
    void onComplete();

    /**
     * Signal an exception.
     * @param t the exception, not null
     */
    void onError(@NonNull Throwable t);

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be disposed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be disposed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();
}

Completable 只有 onComplete 和 onError 事件,同时 Completable 并没有map、flatMap等操作符,它的操作符比起 Observable/Flowable 要少得多。

Completable.create(new CompletableOnSubscribe() {
          @Override
          public void subscribe(CompletableEmitter e) throws Exception {
            System.out.println("start send data");
            //不发送数据,只发送一个完成信号或者异常信息
            e.onComplete();
            //e.onError(new RuntimeException("exec"));
          }
        }).observeOn(Schedulers.newThread()) //发布者线程
        .subscribeOn(Schedulers.io())//订阅者线程
        .subscribe(new CompletableObserver(){
          @Override
          public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
          }
          @Override
          public void onComplete() {
            System.out.println("onComplete");
          }
          @Override
          public void onError(Throwable e) {
            System.out.println("onError:" + e);
          }
        });

    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

andThen( Completable中的方法最常用):在这个操作符中你可以传任何Observable、Single、Flowable、Maybe或者其他Completable,它们会在原来的 Completable 结束后执行

Completable.create(new CompletableOnSubscribe() {
          @Override
          public void subscribe(@NonNull CompletableEmitter emitter) throws Exception {
            try {
              TimeUnit.SECONDS.sleep(1);
              emitter.onComplete();
              System.out.println("onComplete: ");
            } catch (InterruptedException e) {
              emitter.onError(e);
            }
          }
        }).andThen(Observable.range(1, 10))
        .subscribe(new Consumer<Integer>() {
          @Override
          public void accept(@NonNull Integer integer) throws Exception {
            System.out.println(integer);
          }
        });
  }

输出

1
2
3
4
5
6
7
8
9
10
onComplete: 

在这里emitter.onComplete()执行完之后,表明Completable已经完全执行完毕,接下来是执行andThen里的操作。

在Completable中,andThen有多个重载的方法,正好对应了五种被观察者的类型。

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)

Completable 也可以通过toXXX方法转换成Observable、Flowable、Single以及Maybe。

3、Maybe

Maybe 是 RxJava2.x 之后才有的新类型,可以看成是Single和Completable的结合。

Maybe创建之后,MaybeEmitter 和 SingleEmitter 一样并没有onNext()方法,同样需要通过onSuccess()方法来发射数据。

 Maybe
        .create(new MaybeOnSubscribe<String>() {
          @Override
          public void subscribe(MaybeEmitter<String> emitter) throws Exception {
            emitter.onSuccess("hello");
          }
        })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("test: " + s);
          }
        });

输出

test: hello

Maybe也只能发射0或者1个数据,即使发射多个数据,后面发射的数据也不会处理。

Maybe
        .create(new MaybeOnSubscribe<String>() {
          @Override
          public void subscribe(MaybeEmitter<String> emitter) throws Exception {
            emitter.onSuccess("hello");
            emitter.onSuccess("world");
          }
        })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("test: " + s);
          }
        });

输出

test: hello

如果MaybeEmitter先调用了onComplete(),即使后面再调用了onSuccess()也不会发射任何数据。

Maybe
        .create(new MaybeOnSubscribe<String>() {
          @Override
          public void subscribe(MaybeEmitter<String> emitter) throws Exception {
            emitter.onComplete();
            emitter.onSuccess("hello");
          }
        })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            System.out.println("test: " + s);
          }
        });

这次就没有打印任何数据了。

我们对上面的代码再做一下修改,在subscribe()中也加入onComplete(),看看打印出来的结果会是这样的?因为SingleObserver中是没有onComplete()方法。

Maybe.create(new MaybeOnSubscribe<String>() {

            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onComplete();
                e.onSuccess("testA");
            }
        }).subscribe(new Consumer<String>() {

            @Override
            public void accept(@NonNull String s) throws Exception {

                System.out.println("s=" + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Maybe onComplete");
            }
        });

这次打印的结果是

Maybe onComplete

通过查看Maybe相关的源码

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError,
            Action onComplete) {
        ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete));
    }

我们可以得到,Maybe在没有数据发射时候subscribe会调用MaybeObserver的onComplete()。如果Maybe有数据发射或者调用了onError(),是不会再执行MaybeObserver的onComplete()。

我们也可以将 Maybe 转换成Observable、Flowable、Single,只需相应地调用toObservable()、toFlowable()、toSingle()。

4、总结

  • 1、Single:只发射一条单一的数据,或者条异常通知不能发射完成通知,其中数据与通知只能二选一发射。

  • 2、Completable:只发射一条完成通知或者发射一条异常通知,不能发射数据

  • 3、Maybe:可以发射一条单一的数据以及一条异常通知或者一条完成通知,需要注意的是,异常通知和完成通知只能选择其中一个,发射数据只能知完成通知或者异常通知之前,否则发送数据无效

参考

1、RxJava的Single、Completable以及Maybe

以上是关于Android :RxJava学习笔记之SingleCompletable以及Maybe的主要内容,如果未能解决你的问题,请参考以下文章

Android :RxJava学习笔记之过滤操作符

Android :RxJava学习笔记之条件/布尔操作符

Android :RxJava学习笔记之Subject

Android :RxJava学习笔记之Single

Android :RxJava学习笔记之创建操作符

Android :RxJava学习笔记之转换操作符