RxSwift之深入解析map操作符的底层实现

Posted Forever_wj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析map操作符的底层实现相关的知识,希望对你有一定的参考价值。

一、map 操作符的使用

  • map 操作符将源 Observable 的每个元素转换一遍,然后返回含有转换结果的 Observable:

  • 现有如下示例:
Observable<Int>.of(1,2,3,4,5,6)
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)
// 执行结果
123456
  • 在 map 操作:
Observable<Int>.of(1,2,3,4,5,6)
    .map{
        $0+10
    }
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)
// 执行结果
111213141516
  • 说明:
    • of 初始化序列,序列元素类型需保存一致;
    • map操作符,操作序列每个元素加 10 后作为新元素,构成新的序列。
  • 那么,map 是如何给序列重新设置新值的呢?

二、map 源码分析

① map 函数的定义

    • map 闭包就像加工的机器,设定好加工程序 $0+10 就会对 of 中的每一个元素加工产出新的零件,首先来看一下 map 源码都处理了哪些业务:
extension ObservableType {
    public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }
}
  • 分析:
    • transform 逃逸闭包,转换逻辑交给业务层;
    • asObservable() 保证协议的一致性。
  • 可以看到 map 函数是一个带闭包参数的 ObservableType 的扩展函数,内部调用了 composeMap 并传入外部的闭包以便内部调用。我们猜测,该处闭包会被保留在内部,在订阅时被使用,那么根据断点继续探索,看看外界的闭包最终会保留在何处。
  • composeMap 所在类,如下所示:
    • source 向 _map 函数传入了 self 即为当前的序列对象;
    • transform 追踪的外部闭包。
public class Observable<Element> : ObservableType {
    // Type of elements in sequence.
    public typealias E = Element
          ...
          ...
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
  • 可以看到,ObservableType 的子类 Observable 实现 composeMap 方法,返回 Observable 类型的对象,在内部调用了 _map 方法:
internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
    return Map(source: source, transform: transform)
}
  • 继续向 Map 内部传入序列,及业务层闭包,一直强调序列和业务层闭包,主要由于结构复杂,以免被遗忘,后续和订阅难以被联系在一起。

② Map 类

  • 查看 Map 类,如下:
    • Map 继承自 Producer,而 Producer 继承自 Observable,提供了连接序列和观察者的方法对象 sink,及发送序列元素到观察者,再返回到订阅;
    • Map 中保留源序列及业务层闭包方法;
    • run 方法会在父类 Producer 类中方法调用,父类指针指向子类对象。
final private class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let _source: Observable<SourceType>

    private let _transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self._source = source
        self._transform = transform

#if TRACE_RESOURCES
        _ = increment(&_numberOfMapOperators)
#endif
    }

    override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
        let originalSelector = self._transform
        return Map<SourceType, R>(source: self._source, transform: { (s: SourceType) throws -> R in
            let r: ResultType = try originalSelector(s)
            return try selector(r)
        })
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
        let subscription = self._source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

③ 订阅

extension ObservableType {
    // 业务层订阅调用
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}
  • self.asObservable().subscribe(observer) 此处调用的则是 Producer 中的 subscribe 方法,该处方法实现逻辑如下:
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
}

④ run 方法

  • 继续查看内部 self.run 方法调用,它的继承链与 RxSwift之深入解析核心逻辑Observable的底层原理 中的继承链有所不同,它们的继承链对比如下:
    • RxSwift 核心逻辑中的 Producer 的子类是 AnonymousObservable,run方法在此类实现;
    • Map 源码中 Producer 的子类是 Map,run 方法在该处被实现。
  • run 方法的实现如下:
    • MapSink 方法和 RxSwift 核心逻辑中的 AnnonymousObservableSink 类似;
    • self._source 此处为订阅时保存的闭包;
    • .subscribe(sink)Producer 类的方法,传入 sink 用来调用 sink 中的 on 方法。
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
    let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
    let subscription = self._source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}
  • MapSink 中保留的是观察者,Map 中保留的为可观察序列 Observable,通过 Observable 来触发观察者的方法调用,subscribe 方法中调用的 sinkAndSubscription = self.run(observer, cancel: disposer):
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
    fileprivate let _elements: S
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: S, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
  • ObservableSequence 是继承自 Producer 的方法,内部创建了 ObservableSequenceSink 对象并传入了当前 Observable 对象和 observer 对象,最后调用 run() 方法,此处内部为变量序列并调用观察者闭包方法,向外界发送消息。ObservableSequence 类继承自 Sink,由此可知会调用 Sink 中的 forwardOn 方法,实现如下:
final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {
    typealias Parent = ObservableSequence<S>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
  • _elements 是由 of 创建时保留的序列集合,此处对序列元素进行遍历,并调用 forwardOn 方法发送元素。forwardOn 的实现如下,_observer 是上面传入的 MapSink 对象:
class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate var _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(&self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
}
  • 可以看到,此处调用了 sink 的 on 方法,self._observer.on(event)。继续追踪 MapSink 类的 on 方法实现:
final private class MapSink<SourceType, O: ObserverType>: Sink<O>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = O.E
    typealias Element = SourceType

    private let _transform: Transform

    init(transform: @escaping Transform, observer: O, cancel: Cancelable) {
        self._transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self._transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}
  • 至此,就容易理解了,这里的 on 和 RxSwift 核心逻辑中的不同:
    • RxSwift 核心逻辑中此处由业务层 onNext 来触发;
    • Map 中是通过设定好的 of 序列直接触发。
  • 元素处理:
    • let mappedElement = try self._transform(element) 调用外界闭包获取新值;
    • self.forwardOn(.next(mappedElement)) 通过 forwardOn 将新值发送至订阅者。
  • 最终会调用 ObserverBase 中的 on 方法,再调用观察者 observer 的 onCore 方法,向观察者发送元素。在由观察者调用业务层订阅时实现的闭包将序列元素发送到了业务层,到此 map 就完成了对源序列的修改。

三、总结

  • map 是对 sink 做了一层封装,根据业务层的 map 设置在 ObservableSequenceSink 中处理了序列元素再发送至 forwardOn 直至 Observer 对象,由此完成了对元素的加工处理。
  • RxSwift 源码比较繁琐,复杂的逻辑带来的是高效的开发,高效的运行,因此对 RxSwfit 源码还需要进一步地理解和分析。

以上是关于RxSwift之深入解析map操作符的底层实现的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift之深入解析核心逻辑Observable的底层原理

RxSwift之深入解析dispose源码的实现

RxSwift之深入解析Using操作的应用和原理

RxSwift之深入解析Subject的使用和实现原理

RxSwift之深入解析特殊序列deallocating与deallocated的源码实现

RxSwift之深入解析Observable序列的创建