RxSwift之深入解析核心逻辑Observable的底层原理
Posted Forever_wj
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析核心逻辑Observable的底层原理相关的知识,希望对你有一定的参考价值。
一、前言
- 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示:
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
// 发送信号
observer.onNext("发送信号")
observer.onError(NSError.init(domain: "EpisodeError", code: 10086, userInfo: nil))
return Disposables.create()
}
// 订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\\(text)")
}, onError: { (error) in
print("error:\\(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}.disposed(by: disposeBag)
- 代码分析:
-
- 通过 Observable 的 create 创建序列,在 create 闭包内调用 onNext 方法实现信号发送;
-
- 调用 subscribe 方法订阅序列,并实现 subscribe 的参数闭包 onNext,在闭包内监听信号;
-
- 最后通过 disposed 对序列打包等待销毁。
- 运行程序,结果如下:
订阅到:发送信号
完成
销毁
- 那么,发送的信号是如何被订阅到的呢?按正常逻辑,订阅后才能收到信息,我们可以猜测,在成为订阅者并布置好监听后,订阅者向序列发送了一条消息,通知可观察序列可以发送信号。大致流程如下所示:
二、核心逻辑分析
① 创建序列
- 创建序列的源码如下所示:
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
- 该方法是对 ObservableType 协议的扩展,最外层实现的闭包 subscribe 则作为参数传入 AnonymousObservable,并返回 AnonymousObservable 对象。可观察序列的创建是利用协议拓展功能的 create 方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 。
- 继续执行追踪到 AnonymousObservable 类,如下:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- AnonymousObservable 的继承链如下:
AnonymousObservable -> Product -> ObservableType -> ObservableConvertible
- 至此为止,序列的创建完成:
-
- create 方法的时候创建了一个内部对象 AnonymousObservable;
-
- AnonymousObservable 保存了外界的闭包;
-
- AnonymousObservable 继承了 Producer 具有非常重要的方法 subscribe。
- AnonymousObservable 的继承关系如下所示:
② 订阅序列
- 根据方法名找到 subscribe 的实现(订阅方法 subscribe 和上面所说的 subscribe 不是同一个方法),该方法是对 ObservableType 的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver 类型的闭包 observer,源码如下:
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
// 此处省略不影响探索的代码
......
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
- 说明:
-
- E 是 Swift 的关联类型,仔细观察可观察序列的继承链源码,应该不难得出:E 就是序列类型,这里就是 String:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
-
- observer 内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,event 是 observer 的参数,不难看出,observer 闭包也是在其他地方调用,传入带有信号值的 event 参数。
-
- observer 的继承链关系如下:
- observer 被当做参数传入到 subscribe 中,而 observer 的调用必然是在 subscribe 中实现:
self.asObservable().subscribe(observer)
- self.asObservable() 该方法返回本身,保证协议的一致性,方法如下:
public class Observable<Element> : ObservableType {
// 省去代码若干
......
public func asObservable() -> Observable<E> {
return self
}
}
- self.asObservable().subscribe(observer) 其实本质就是 self.subscribe(observer),继续断点执行找到 subscribe 方法,通过可观察序列的继承关系,可以非常快速的定位 Producer 订阅代码,正是上面所提到的 Producer 中的方法,方法如下:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
// 省去代码若干
......
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
- 接着,observer 观察者被传入到 run 中,上面说到该观察者一定会被调用,继续深入:
let sinkAndSubscription = self.run(observer, cancel: disposer)
- 继续断点执行,发现 self.run 的调用,调用的是 AnonymousObservable 中的 run 方法,代码如下:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- 此处就是创建序列时的 AnonymousObservable 类,在 run 方法类创建了 sink 对象,在初始化时传入了上面所说的观察者,记住 sink 保存了观察者 observer 闭包,并且调用了 sink.run(self) 方法,传入的是创建时产生的可观察序列 observable 闭包对象,深入 run:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 省去代码若干
// 此处向父类Sink初始化了observer对象
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
- 此处 parent 由 let subscription = sink.run(self) 传入,self 即为创建序列 create 方法返回的 observable 对象,而 _subscribeHandler 是创建序列所保存的闭包,此时闭包就被调用了,被调用闭包如下:
let obs = Observable<Any>.create { (observer) -> Disposable in
// 发送消息
observer.onNext("我是一条消息")
return Disposables.create()
}
- 发送信号的闭包被调用,接下来就是信号发送。订阅的内部流程如下:
③ 发送信号
- 继续探索,代码已经执行到业务层,在信号发送闭包中通常调用一下三种方法,用来发送信号,如下:
-
- observer.onNext(“发送消息”) 信号发送;
-
- observer.onCompleted() 序列完成,完成后序列将被释放;
-
- observer.onError(error) 序列出错中断,序列不可继续使用,被释放。
- 以上三个方法为 ObserverType 的拓展方法(E 表示一个泛型信号量,可表示任意类型的信号):
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
- .next(element) 是一个带泛型参数的枚举,管理了三种类型事件的消息传递,如下:
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
- on 是 AnonymousObservableSink 中的方法,代码如下:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 代码省略若干行
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(&self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
- 内部根据 Event 枚举不同成员变量做不同的信号发送,信号发送调用了 forwardOn 方法。方法实现如下:
class Sink<O : ObserverType> : Disposable {
init(observer: O, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(&self._disposed, 1) {
return
}
self._observer.on(event)
}
}
- 代码有些长只保留了核心部分,Sink 即 AnonymousObservableSink 的父类,_observer 即是订阅中在内部产生的 AnonymousObserver 对象,而该对象调用了 on 方法并传递了信号,on 方法所在位置如下:AnonymousObserver -> ObserverBase -> on()
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
func on(_ event: Event<E>) {
switch event {
case .next:
if load(&self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
- 在方法内部又掉用了 self.onCore(event),此时该方法在 AnonymousObserver 中实现,代码如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
- 说明:
-
- 此处通过 _eventHandler 来发送信号,_eventHandler 是从哪来的呢?逆推 onCore 调用者是 observer,而 observer 是订阅时在内部创建的,被一层层传入到此;
-
- 而在 observer 初始化时即被保存为 _eventHandler,_eventHandler 调用即调用了订阅时创建的 observer 闭包,进而信号又通过闭包内的闭包传出到业务层。
// 订阅序列
obs.subscribe(onNext: { (val) in
print("onNext:\\(val)")
}).disposed(by: disposeBag)
- 发送信息的内部流程如下:
- 响应式编程的创建、订阅、发送、接收等流程就已完成,整个流程会觉得很复杂,但它统一了所有事件的创建与监听,统一思想快速开发,今后的开发流程就是:
创建序列 -> 订阅序列 -> 发送序列 -> 响应序列
- sink 在 Rx 中充当管理者,管理序列,观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。
三、总结
- 分析思维导图:
- 核心逻辑流程图:
以上是关于RxSwift之深入解析核心逻辑Observable的底层原理的主要内容,如果未能解决你的问题,请参考以下文章