RxSwift之深入解析Subject的使用和实现原理
Posted Forever_wj
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析Subject的使用和实现原理相关的知识,希望对你有一定的参考价值。
一、Subject
- RxSwift 的核心逻辑 Observable 不具备发送事件的能力,创建一个 Observable 的时候就要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过 Event 发出去。但有时希望 Observable 在运行时能动态地获得或者说产生一个新的数据,再通过 Event 发送出去。比如,订阅一个输入框的输入内容,当用户每输入一个字符之后,输入框关联的 Observable 就会发出一个带有输入内容的 Event,通知给所有订阅者。为此,RxSwift 提供了一种可以发送事件又可以订阅事件值的对象,它就是 Subject。
- Subject 既是订阅者,也是 Observable:它是订阅者,是因为能够动态地接收新的值;它是 Observable,是因为当 Subjects 有了新的值之后,就会通过 Event 将新值发出给它的所有订阅者。
- Subject 常用的方法:
-
- onNext( : ):是 on(.next( : )) 的简写,该方法相当于 subject 接收到一个.next 事件;
-
- onError( : ):是 on(.error( : )) 的简写,该方法相当于 subject 接收到一个 .error 事件;
-
- onCompleted():是 on(.completed) 的简写,该方法相当于 subject 接收到一个 .completed 事件。
二、PublishSubject
- PublishSubject 不需要初始值就能创建,它的订阅者从开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到它们在订阅前已发出的 Event,即 PublishSubject 仅仅发送在订阅之后由源 Observable 发送的数据。
- 如下所示,最上面是 PublishSubject,下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 PublishSubject 的订阅者只能收到它们订阅后的 Event:
- PublishSubject 一旦被建立就会立刻开始发送事件(除非采取方法去阻止它),这种机制有丢失事件的风险,因为在 Subject 被创建和被监听之间有一定的时间间隔,如果想保证所有的事件都可以被监听到的话,可以有两种方法:
-
- 第一种方法是使用 Create 方法(在发送之前检查是否所有 observer 已经订阅);
-
- 第二种方法是可以使用 ReplaySubject。
- 如果源 Observable 被一个 error 中断,PublishSubject 将不会发送任何事件给后续的 observer,但是它会传递 error 信息,如下所示:
- 使用示例如下:
let disposeBag = DisposeBag()
// 创建一个PublishSubject
let subject = PublishSubject<String>()
// 由于当前没有任何订阅者,所以该信息不会被输出到控制台
subject.onNext("1")
// 第1次订阅subject
subject.subscribe(onNext: { string in
print("第1次订阅:", string)
}, onCompleted:{
print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
// 当前有1个订阅,则该信息会被输出到控制台
subject.onNext("2")
// 第2次订阅subject
subject.subscribe(onNext: { string in
print("第2次订阅:", string)
}, onCompleted:{
print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
// 当前有2个订阅,则该信息会输出到控制台
subject.onNext("3")
// subject结束
subject.onCompleted()
// subject完成后会发出.next事件
subject.onNext("4")
// subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件
subject.subscribe(onNext: { string in
print("第3次订阅:", string)
}, onCompleted:{
print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
- 运行结果如下:
第1次订阅: 2
第1次订阅: 3
第2次订阅: 3
第1次订阅:onCompleted
第2次订阅:onCompleted
第3次订阅:onCompleted
三、AsyncSubject
- AsyncSubject 只发送由源 Observable 发送的最后一个事件,并且只在源 Observable 完成之后(如果源 Observable 没有发送任何值,AsyncSubject 也不会发送任何值):
- AsyncSubject 会发送相同的值给所有 observer。但是,如果源 Observable 被 error 中断了发送,AsyncSubject 便不会发送任何事件,而是会发送从源 Observable 传来的 error 提示:
- 使用示例:
let disposeBag = DisposeBag()
// 创建序列
let asynSub = AsyncSubject<Int>.init()
// 发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 订阅序列
asynSub.subscribe{ print("订阅到:",$0)}
.disposed(by: disposeBag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "dw", code: 10086, userInfo: nil))
asynSub.onCompleted()
- 运行结果如下:
订阅到: next(4)
订阅到: completed
四、BehaviorSubject
- 当一个 observer 订阅一个 BehaviorSubject,它就开始发送最近由源 Observable 发送的事件(或者是还没有被发送的种子值/默认值),然后继续发送从源 Observable 接收到的其它事件。如下所示:
- 如果源 Observable 被一个 error 中断,那么 BehaviorSubject 不会发送事件给后续的 observer,但会传递给它们 error 的信息。如下所示:
- 使用示例:
let disposeBag = DisposeBag()
// 创建一个BehaviorSubject
let subject = BehaviorSubject(value: "1")
// 第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
// 发送next事件
subject.onNext("2")
// 发送error事件
subject.onError(NSError(domain: "dw", code: 0, userInfo: nil))
// 第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
- 运行结果如下:
第1次订阅: next(1)
第1次订阅: next(2)
第1次订阅: error(Error Domain=dw Code=0 "(null)")
第2次订阅: error(Error Domain=dw Code=0 "(null)")
五、ReplaySubject
- ReplaySubject 在创建时候需要设置一个 bufferSize,表示它对于它发送过的 event 的缓存个数。比如一个 ReplaySubject 的 bufferSize 设置为 2,它发出了 3 个 .next 的 event,那么它会将后两个(最近的两个)event 给缓存起来。此时如果有一个 subscriber 订阅了这个 ReplaySubject,那么这个 subscriber 就会立即收到前面缓存的两个.next 的 event。
- 如果一个 subscriber 订阅已经结束的 ReplaySubject,除了会收到缓存的 .next 的 event外,还会收到那个终结的 .error 或者 .complete 的event。
- 如下所示,最上面是 ReplaySubject(bufferSize 设为为 2),下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 ReplaySubject 的订阅者一开始就能收到 ReplaySubject 之前发出的两个 Event(如果有):
- 使用示例:
let disposeBag = DisposeBag()
// 创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)
// 连续发送3个next事件
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
// 第1次订阅subject
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
// 再发送1个next事件
subject.onNext("4")
// 第2次订阅subject
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
// 让subject结束
subject.onCompleted()
// 第3次订阅subject
subject.subscribe { event in
print("第3次订阅:", event)
}.disposed(by: disposeBag)
- 运行结果如下:
第1次订阅: next(2)
第1次订阅: next(3)
第1次订阅: next(4)
第2次订阅: next(3)
第2次订阅: next(4)
第1次订阅: completed
第2次订阅: completed
第3次订阅: next(3)
第3次订阅: next(4)
第3次订阅: completed
六、Variable
- Variable 其实就是对 BehaviorSubject 的封装,所以它也必须要通过一个默认的初始值进行创建,它具有 BehaviorSubject 的功能,能够向它的订阅者发出上一个 event 以及之后新创建的 event。
- 不同的是,Variable 还会把当前发出的值保存为自己的状态,同时它会在销毁时自动发送 .complete 的 event,不需要也不能手动给 Variables 发送 completed 或者 error 事件来结束它。
- 简单地说,就是 Variable 有一个 value 属性,改变这个 value 属性的值就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value 属性里,直到再次修改它。
- Variables 本身没有 subscribe() 方法,但是所有 Subjects 都有一个 asObservable() 方法,可以使用这个方法返回这个 Variable 的 Observable 类型,拿到这个 Observable 类型就能订阅它。
- 使用示例:
let disposeBag = DisposeBag()
// 创建一个初始值为1的Variable
let variable = Variable("1")
// 修改value值
variable.value = "2"
// 第1次订阅
variable.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
// 修改value值
variable.value = "3"
// 第2次订阅
variable.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
// 修改value值
variable.value = "4"
- 运行结果:
第1次订阅: next(2)
第1次订阅: next(3)
第2次订阅: next(3)
第1次订阅: next(4)
第2次订阅: next(4)
第1次订阅: completed
第2次订阅: completed
- Variable 虽然被废弃了,但是由于 Variable 的灵活性,因此在开发里面应用非常之多。
七、BehaviorRelay
- BehaviorRelay 会替换原来的 Variable,储存一个信号,并且可以随时订阅响应,但响应发送的时候要注意 behaviorR.accept(20):
let disposeBag = DisposeBag()
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe(onNext: { (num) in
print(num)
.disposed(by: disposeBag)
print("打印:\\(behaviorRelay.value)")
behaviorRelay.accept(1000)
八、Subject 原理分析
① SubjectType
- SubjectType 继承自 ObservableType,具有序列特性,并且关联观察者类型,具备观察者类型的能力,如下:
public protocol SubjectType : ObservableType {
// 关联观察者类型,具备观察者类型的能力
associatedtype SubjectObserverType : ObserverType
func asObserver() -> SubjectObserverType
}
- 现有如下实例 subject:
let disposeBag = DisposeBag()
// 初始化序列
let publishSub = PublishSubject<Int>()
// 发送响应序列
publishSub.onNext(1)
// 订阅序列
publishSub.subscribe { print("订阅到:",$0)}
.disposed(by: disposeBag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
- PublishSubject 很明显能够订阅信号(序列最基本的能力),并且能够发送响应,又是观察者的能力。
② 订阅响应流程
public override func subscribe -> Disposable {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe -> Disposable {
......
......
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
- self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显在合适的地方一次性全部执行。其中也返回这次订阅的销毁者,方便执行之后的工作:synchronizedUnsubscribe->self._observers.removeKey(disposeKey)。
mutating func removeKey(_ key: BagKey) -> T? {
if _key0 == key {
_key0 = nil
let value = _value0!
_value0 = nil
return value
}
if let existingObject = _dictionary?.removeValue(forKey: key) {
return existingObject
}
for i in 0 ..< _pairs.count where _pairs[i].key == key {
let value = _pairs[i].value
_pairs.remove(at: i)
return value
}
return nil
}
- 遍历通过 key 获取响应 bag 中的 value,执行集合移除,因为没有相应持有关系,达到自动释放销毁。
③ 发送信号流程
public func on(_ event: Event<Element>) {
dispatch(self._synchronized_on(event), event)
}
- 这里调用了 dispatch 函数,传了两个参数:self._synchronized_on(event) 和 event,查看 dispatch 函数源码:
func dispatch<E>(_ bag: Bag) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
- bag._value0?(event) 首先执行事件的回调,然后判断 bag._onlyFastPath 的情况,默认会开启快速通道,如果是开启慢速通道,需要从刚刚添加进 bag 包里面的匹配,挨个进行 pairs[i].value(event) 外界事件回调,然后拿回外界封装的闭包的闭包调用 :element(event)。
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
- 如果 self._isDisposed || self._stopped 成立,就会返回一个空的集合,也就没有序列的响应。在 .completed, .error 都会改变状态 self._stopped = true,也就是说序列完成或者错误之后都无法再次响应。在.completed, .error 还会移除添加在集合里面的内容。
- Subject 流程图如下(Subject 把订阅流程和响应流程都内部实现,所以没有必要引入 sink):
④ Subject 对比
- PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、Variable,它们之间既有各自的特点,也有相同之处:
-
- 首先它们都是 Observable,它们的订阅者都能收到它们发出的新的 Event;
-
- 直到 Subject 发出 .complete 或者 .error 的 Event 后,该 Subject 便终结,同时它也就不会再发出 .next 事件;
-
- 对于那些在 Subject 终结后再订阅它的订阅者,也能收到 subject 发出的一条 .complete 或 .error 的 event,告诉新的订阅者它已经终结。
- 它们之间最大的区别只是在于:当一个新的订阅者刚订阅它的时候,能不能收到 Subject 以前发出过的旧 Event,如果能的话又能收到多少个 Event。
以上是关于RxSwift之深入解析Subject的使用和实现原理的主要内容,如果未能解决你的问题,请参考以下文章
RxSwift之深入解析特殊序列deallocating与deallocated的源码实现