RxJava2.0中map操作符用法和源码分析

Posted yuminfeng728

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava2.0中map操作符用法和源码分析相关的知识,希望对你有一定的参考价值。

map基本使用

map是变换操作符,对原始Observable发射的每一项数据应用一个你选择的函数生成新的结果,然后返回一个发射这些结果Observable。

但从字面上还是比较难以理解,我们可以用代码示例说明:

Observable.just(1,2,3).map(new Function<Integer, String>() 
    @Override
    public String apply(Integer integer) throws Exception 
        return "This is new result " + integer;
    
).subscribe(new Consumer<String>() 
    @Override
    public void accept(String s) throws Exception 
        println("accept : " + s +"\\n");
    
);

输出结果:
accept : This is new result 1

accept : This is new result 2

accept : This is new result 3

由上面代码可知,执行map操作时,首先接收原始Observable发射的数据,然后根据你的操作生成新的数据并将这些新的数据发射,这时观察者中接收的就是新生成的数据。

下面我们将从源码的角度来分析下:

这里我们首先使用just操作符创建一个Observable来发射指定的数据。关于just如何创建Observable对象,我们这里不做分析,前面文章中已经说明。这里just创建的具体对象为ObservableFromArray。我们直接分析map的源码,我们看到在调用map方法时,我们需要传入一个Function的对象:

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> 
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;

如上所述,这个接口的功能主要是接收一个值(T),然后返回另一个值(R)。

我们在查看map的方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) 
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));

与之前其它操作符一样的调用逻辑,将当前的Observable对象和生成的Function对象作为参数,生成一个ObservableMap的对象。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> 
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) 
        super(source);
        this.function = function;
    

    @Override
    public void subscribeActual(Observer<? super U> t) 
        source.subscribe(new MapObserver<T, U>(t, function));
    

    ......

完成了Observable对象初始化后,我们开始订阅观察者。这里我们选择使用的观察者为Consumer对象。订阅观察者时,执行subscribe方法:
Observable#subscribe

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) 
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());



@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) 
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;

同样的,将在subscribe方法中执行subscribeActual(observer)方法:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) 
    ObjectHelper.requireNonNull(observer, "observer is null");
    try 
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
     catch (NullPointerException e)  // NOPMD
        throw e;
     catch (Throwable e) 
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    

这里其实执行的是ObservableMap中的subscribeActual方法。在subscribeActual方法中,首先会创建一个MapObserver对象,参数t对应的是LambdaObserver。
然后执行source.subscribe方法,source代表的是之前的Observable对象,也就是just创建的ObservableFromArray对象,所以再次调用Observable中subscribe方法,执行subscribeActual,而这次执行的对象是ObservableFromArray,而参数observer具体实现是MapObserver的对象:

public final class ObservableFromArray<T> extends Observable<T> 
    final T[] array;
    public ObservableFromArray(T[] array) 
        this.array = array;
    
    @Override
    public void subscribeActual(Observer<? super T> s) 
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) 
            return;
        

        d.run();
    

    ......    

到这里执行的逻辑与之前分析fromArray操作符用法相同,不再做具体分析。在FromArrayDisposable中具体执行的run方法中:
FromArrayDisposable#run

void run() 
    T[] a = array;
    int n = a.length;

    for (int i = 0; i < n && !isDisposed(); i++) 
        T value = a[i];
        if (value == null) 
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        
        actual.onNext(value);
    
    if (!isDisposed()) 
        actual.onComplete();
    

如上此时actual是由MapObserver实现的,我们看下MapObserver的onNext方法:
MapObserver#onNext:

@Override
public void onNext(T t) 
    if (done) 
        return;
    

    if (sourceMode != NONE) 
        actual.onNext(null);
        return;
    

    U v;

    try 
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
     catch (Throwable ex) 
        fail(ex);
        return;
    
    actual.onNext(v);

当调用onNext方法时,通过Function接口回调apply方法获得转换后的数据,然后再通过 actual.onNext(v)方法发射出去。此时的actual中的onNext方法就可以接收新的参数了,而actual就是之前初始化的LambdaObserver对象。通过它可以让Consumer的accept方法中接收该数据。这里的执行逻辑之前已经分析过,这里不再详述。

以上是关于RxJava2.0中map操作符用法和源码分析的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.0中flatMap操作符用法和源码分析

RxJava2.0中fromArray操作符用法和源码分析

RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码

RxJava的map操作符源码分析

Java 代码实例 14BeanUtils用法详解,附源码分析

Rxjava源码分析&实践实践环节:map操作符功能实现