RxJava2.0中fromArray操作符用法和源码分析

Posted yuminfeng728

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava2.0中fromArray操作符用法和源码分析相关的知识,希望对你有一定的参考价值。

fromArray基本使用

fromArray用来创建一个Observable对象,可以将一个数组转化为可被观察的序列并且将它的数据逐个发射。

fromArray与just相似,都可以用来发射单个或一组数据,但是区别是当fromArray发射一组数据到观察序列中来时,它会先进行遍历,然后再逐个发射。而just发射一组数据时,会把它当成一个整体,一次性发射。字面意思难以理解,我们用代码说明:

Integer[] items =  0, 1, 2, 3, 4, 5 ;

Observable.fromArray(items).subscribe(new Consumer<Integer>() 
    @Override
    public void accept(Integer integer) throws Exception 
        println("onNext : fromArray : " + integer + "\\n");
    
);

Observable.just(items).subscribe(new Consumer<Integer[]>() 
    @Override
    public void accept(Integer[] integers) throws Exception 
        for (int i: integers) 
            println("onNext : just : " + i + "\\n");
        
    
);

输出结果:

onNext : fromArray : 0
onNext : fromArray : 1
onNext : fromArray : 2
onNext : fromArray : 3
onNext : fromArray : 4
onNext : fromArray : 5

onNext : just : 0
onNext : just : 1
onNext : just : 2
onNext : just : 3
onNext : just : 4
onNext : just : 5

上面输出结果是不一样的,使用fromArray时,接收的数据是逐个打印出来,而使用just时,直接接收的是一个数组。所以如上面所说fromArray接收的数据源是逐个发射的,而just是将数据作为一个完整的对象一次性发射的。

下面我们将继续从源码的角度进行分析:
老规矩我们还是从fromArray方法中着手分析:
Observable#fromArray

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) 
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) 
        return empty();
     else
    if (items.length == 1) 
        return just(items[0]);
    
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));

fromArray方法接收的是一个可变参数,该参数又作为创建ObservableFromArray对象的参数。最后返回的是ObservableFromArray对象,作为Observable的具体实现类。

然后开始订阅观察者对象,这里我使用Consumer作为观察者。继续查看subscribe方法:
Observable#subscribe

......

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) 
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;


@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) 
    ObjectHelper.requireNonNull(observer, "observer is null");
    try 
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
     catch (NullPointerException e)  // NOPMD
        throw e;
     catch (Throwable e) 
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    

这里其实和just操作符调用的逻辑一样,都是重载的subscribe方法,最后执行的是接收四个参数的subscribe方法,如上。这里的创建LambdaObserver对象,不再做重复说明。
subscribeActual其实调用的是Observable实现类中的方法,也就是ObservableFromArray中的方法。我们直接进入该类中查看:
ObservableFromArray

public final class ObservableFromArray<T> extends Observable<T> 
    final T[] array;
    public ObservableFromArray(T[] array) 
        this.array = array;
    
    @Override
    public void subscribeActual(Observer<? super T> s) 
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) 
            return;
        

        d.run();
    

    ......

subscribeActual中,创建了一个FromArrayDisposable类对象,执行了Observer中的onSubscribe方法,也就是LambdaObserver中的onSubscribe方法。我们在进入这个方法中看一下:

LambdaObserver#onSubscribe


public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete,
        Consumer<? super Disposable> onSubscribe) 
    super();
    this.onNext = onNext;
    this.onError = onError;
    this.onComplete = onComplete;
    this.onSubscribe = onSubscribe;


@Override
public void onSubscribe(Disposable s) 
    if (DisposableHelper.setOnce(this, s)) 
        try 
            onSubscribe.accept(this);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            s.dispose();
            onError(ex);
        
    

通过第四个参数 onSubscribe的接口回调方法,将FromArrayDisposable对象暴露给外部调用。这里也不再说明。
我们重点来看看FromArrayDisposable中的run方法:

FromArrayDisposable#run

void run() 
    T[] a = array;
    int n = a.length;

    for (int i = 0; i < n && !isDisposed(); i++) 
        T value = a[i];
        if (value == null) 
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        
        actual.onNext(value);
    
    if (!isDisposed()) 
        actual.onComplete();
    

array 代表我们刚才传入的数组的对象。在for循环中对数组进行遍历,然后将数组中的值据逐个通过actual.onNext(value)方法发送出去。如果出现错误时,则调用actual.onError。完成发送后,在调用actual.onComplete()方法。而我们知道这里面分别调用的是LambdaObserver对象中的方法。

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable 

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) 
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    

    @Override
    public void onSubscribe(Disposable s) 
        if (DisposableHelper.setOnce(this, s)) 
            try 
                onSubscribe.accept(this);
             catch (Throwable ex) 
                Exceptions.throwIfFatal(ex);
                s.dispose();
                onError(ex);
            
        
    

    @Override
    public void onNext(T t) 
        if (!isDisposed()) 
            try 
                onNext.accept(t);
             catch (Throwable e) 
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            
        
    

    @Override
    public void onError(Throwable t) 
        if (!isDisposed()) 
            lazySet(DisposableHelper.DISPOSED);
            try 
                onError.accept(t);
             catch (Throwable e) 
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            
        
    

    @Override
    public void onComplete() 
        if (!isDisposed()) 
            lazySet(DisposableHelper.DISPOSED);
            try 
                onComplete.run();
             catch (Throwable e) 
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
            
        
    

    @Override
    public void dispose() 
        DisposableHelper.dispose(this);
    

    @Override
    public boolean isDisposed() 
        return get() == DisposableHelper.DISPOSED;
    

如上,在LambdaObserver类中onNext,onError,onComplete分别对应的不同的参数回调,从而将接口中的数据传递给外部。这样便完成了调用。

下面列出fromArray和just操作符的简单demo:

List<String> list = new ArrayList<>();
list.add("hello");
list.add("world");

Observable.fromArray(list).subscribe(new Consumer<List<String>>() 
    @Override
    public void accept(List<String> strings) throws Exception 
        println("onNext : fromArray : " + strings + "\\n");
    
);

Observable.just(list).subscribe(new Consumer<List<String>>() 
    @Override
    public void accept(List<String> strings) throws Exception 
        println("onNext : just : " + strings + "\\n");
    
);

输出结果:

onNext : fromArray : [hello, world]

onNext : just : [hello, world]

以上是关于RxJava2.0中fromArray操作符用法和源码分析的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.0中flatMap操作符用法和源码分析

RxJava2.0中flatMap操作符用法和源码分析

关于RxJava2 0你不知道的事

RxJava和Retrofit的简单使用

给初学者的RxJava2.0教程(转)

给初学者的RxJava2.0教程(转)