RxSwift之深入解析Subject的使用和实现原理

Posted Forever_wj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析Subject的使用和实现原理相关的知识,希望对你有一定的参考价值。

一、Subject

  • RxSwift 的核心逻辑 Observable 不具备发送事件的能力,创建一个 Observable 的时候就要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过 Event 发出去。但有时希望 Observable 在运行时能动态地获得或者说产生一个新的数据,再通过 Event 发送出去。比如,订阅一个输入框的输入内容,当用户每输入一个字符之后,输入框关联的 Observable 就会发出一个带有输入内容的 Event,通知给所有订阅者。为此,RxSwift 提供了一种可以发送事件又可以订阅事件值的对象,它就是 Subject。
  • Subject 既是订阅者,也是 Observable:它是订阅者,是因为能够动态地接收新的值;它是 Observable,是因为当 Subjects 有了新的值之后,就会通过 Event 将新值发出给它的所有订阅者。
  • Subject 常用的方法:
    • onNext( : ):是 on(.next( : )) 的简写,该方法相当于 subject 接收到一个.next 事件;
    • onError( : ):是 on(.error( : )) 的简写,该方法相当于 subject 接收到一个 .error 事件;
    • onCompleted():是 on(.completed) 的简写,该方法相当于 subject 接收到一个 .completed 事件。

二、PublishSubject

  • PublishSubject 不需要初始值就能创建,它的订阅者从开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到它们在订阅前已发出的 Event,即 PublishSubject 仅仅发送在订阅之后由源 Observable 发送的数据。
  • 如下所示,最上面是 PublishSubject,下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 PublishSubject 的订阅者只能收到它们订阅后的 Event:

  • PublishSubject 一旦被建立就会立刻开始发送事件(除非采取方法去阻止它),这种机制有丢失事件的风险,因为在 Subject 被创建和被监听之间有一定的时间间隔,如果想保证所有的事件都可以被监听到的话,可以有两种方法:
    • 第一种方法是使用 Create 方法(在发送之前检查是否所有 observer 已经订阅);
    • 第二种方法是可以使用 ReplaySubject。
  • 如果源 Observable 被一个 error 中断,PublishSubject 将不会发送任何事件给后续的 observer,但是它会传递 error 信息,如下所示:

  • 使用示例如下:
let disposeBag = DisposeBag()
 
// 创建一个PublishSubject
let subject = PublishSubject<String>()
 
// 由于当前没有任何订阅者,所以该信息不会被输出到控制台
subject.onNext("1")
 
// 第1次订阅subject
subject.subscribe(onNext: { string in
    print("第1次订阅:", string)
}, onCompleted:{
    print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
 
// 当前有1个订阅,则该信息会被输出到控制台
subject.onNext("2")
 
// 第2次订阅subject
subject.subscribe(onNext: { string in
    print("第2次订阅:", string)
}, onCompleted:{
    print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
 
// 当前有2个订阅,则该信息会输出到控制台
subject.onNext("3")
 
// subject结束
subject.onCompleted()
 
// subject完成后会发出.next事件
subject.onNext("4")
 
// subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件
subject.subscribe(onNext: { string in
    print("第3次订阅:", string)
}, onCompleted:{
    print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
  • 运行结果如下:
1次订阅: 21次订阅: 32次订阅: 31次订阅:onCompleted
第2次订阅:onCompleted
第3次订阅:onCompleted

三、AsyncSubject

  • AsyncSubject 只发送由源 Observable 发送的最后一个事件,并且只在源 Observable 完成之后(如果源 Observable 没有发送任何值,AsyncSubject 也不会发送任何值):

  • AsyncSubject 会发送相同的值给所有 observer。但是,如果源 Observable 被 error 中断了发送,AsyncSubject 便不会发送任何事件,而是会发送从源 Observable 传来的 error 提示:

  • 使用示例:
let disposeBag = DisposeBag()
// 创建序列
let asynSub = AsyncSubject<Int>.init()
// 发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 订阅序列
asynSub.subscribe{ print("订阅到:",$0)}
    .disposed(by: disposeBag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "dw", code: 10086, userInfo: nil))
asynSub.onCompleted()
  • 运行结果如下:
订阅到: next(4)
订阅到: completed

四、BehaviorSubject

  • 当一个 observer 订阅一个 BehaviorSubject,它就开始发送最近由源 Observable 发送的事件(或者是还没有被发送的种子值/默认值),然后继续发送从源 Observable 接收到的其它事件。如下所示:

  • 如果源 Observable 被一个 error 中断,那么 BehaviorSubject 不会发送事件给后续的 observer,但会传递给它们 error 的信息。如下所示:

  • 使用示例:
let disposeBag = DisposeBag()
 
// 创建一个BehaviorSubject
let subject = BehaviorSubject(value: "1")
 
// 第1次订阅subject
subject.subscribe { event in
    print("第1次订阅:", event)
}.disposed(by: disposeBag)
 
// 发送next事件
subject.onNext("2")
 
// 发送error事件
subject.onError(NSError(domain: "dw", code: 0, userInfo: nil))
 
// 第2次订阅subject
subject.subscribe { event in
    print("第2次订阅:", event)
}.disposed(by: disposeBag)
  • 运行结果如下:
1次订阅: next(1)1次订阅: next(2)1次订阅: error(Error Domain=dw Code=0 "(null)")2次订阅: error(Error Domain=dw Code=0 "(null)")

五、ReplaySubject

  • ReplaySubject 在创建时候需要设置一个 bufferSize,表示它对于它发送过的 event 的缓存个数。比如一个 ReplaySubject 的 bufferSize 设置为 2,它发出了 3 个 .next 的 event,那么它会将后两个(最近的两个)event 给缓存起来。此时如果有一个 subscriber 订阅了这个 ReplaySubject,那么这个 subscriber 就会立即收到前面缓存的两个.next 的 event。
  • 如果一个 subscriber 订阅已经结束的 ReplaySubject,除了会收到缓存的 .next 的 event外,还会收到那个终结的 .error 或者 .complete 的event。
  • 如下所示,最上面是 ReplaySubject(bufferSize 设为为 2),下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 ReplaySubject 的订阅者一开始就能收到 ReplaySubject 之前发出的两个 Event(如果有):

  • 使用示例:
let disposeBag = DisposeBag()
 
// 创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)
 
// 连续发送3个next事件
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
 
// 第1次订阅subject
subject.subscribe { event in
    print("第1次订阅:", event)
}.disposed(by: disposeBag)
 
// 再发送1个next事件
subject.onNext("4")
 
// 第2次订阅subject
subject.subscribe { event in
    print("第2次订阅:", event)
}.disposed(by: disposeBag)
 
// 让subject结束
subject.onCompleted()
 
// 第3次订阅subject
subject.subscribe { event in
    print("第3次订阅:", event)
}.disposed(by: disposeBag)
  • 运行结果如下:
1次订阅: next(2)1次订阅: next(3)1次订阅: next(4)2次订阅: next(3)2次订阅: next(4)1次订阅: completed
第2次订阅: completed
第3次订阅: next(3)3次订阅: next(4)3次订阅: completed

六、Variable

  • Variable 其实就是对 BehaviorSubject 的封装,所以它也必须要通过一个默认的初始值进行创建,它具有 BehaviorSubject 的功能,能够向它的订阅者发出上一个 event 以及之后新创建的 event。
  • 不同的是,Variable 还会把当前发出的值保存为自己的状态,同时它会在销毁时自动发送 .complete 的 event,不需要也不能手动给 Variables 发送 completed 或者 error 事件来结束它。
  • 简单地说,就是 Variable 有一个 value 属性,改变这个 value 属性的值就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value 属性里,直到再次修改它。
  • Variables 本身没有 subscribe() 方法,但是所有 Subjects 都有一个 asObservable() 方法,可以使用这个方法返回这个 Variable 的 Observable 类型,拿到这个 Observable 类型就能订阅它。
  • 使用示例:
let disposeBag = DisposeBag()
                 
// 创建一个初始值为1的Variable
let variable = Variable("1")
         
// 修改value值
variable.value = "2"
         
// 第1次订阅
variable.asObservable().subscribe {
            print("第1次订阅:", $0)
}.disposed(by: disposeBag)
         
// 修改value值
variable.value = "3"
         
// 第2次订阅
variable.asObservable().subscribe {
	print("第2次订阅:", $0)
}.disposed(by: disposeBag)
         
// 修改value值
variable.value = "4"
  • 运行结果:
1次订阅: next(2)1次订阅: next(3)2次订阅: next(3)1次订阅: next(4)2次订阅: next(4)1次订阅: completed
第2次订阅: completed
  • Variable 虽然被废弃了,但是由于 Variable 的灵活性,因此在开发里面应用非常之多。

七、BehaviorRelay

  • BehaviorRelay 会替换原来的 Variable,储存一个信号,并且可以随时订阅响应,但响应发送的时候要注意 behaviorR.accept(20):
let disposeBag = DisposeBag()
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe(onNext: { (num) in
    print(num)
.disposed(by: disposeBag)
print("打印:\\(behaviorRelay.value)")

behaviorRelay.accept(1000)

八、Subject 原理分析

① SubjectType

  • SubjectType 继承自 ObservableType,具有序列特性,并且关联观察者类型,具备观察者类型的能力,如下:
public protocol SubjectType : ObservableType {
      // 关联观察者类型,具备观察者类型的能力
    associatedtype SubjectObserverType : ObserverType
    func asObserver() -> SubjectObserverType
}
  • 现有如下实例 subject:
let disposeBag = DisposeBag()
// 初始化序列
let publishSub = PublishSubject<Int>() 
// 发送响应序列
publishSub.onNext(1)
// 订阅序列
publishSub.subscribe { print("订阅到:",$0)}
    .disposed(by: disposeBag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
  • PublishSubject 很明显能够订阅信号(序列最基本的能力),并且能够发送响应,又是观察者的能力。

② 订阅响应流程

public override func subscribe -> Disposable {
    self._lock.lock()
    let subscription = self._synchronized_subscribe(observer)
    self._lock.unlock()
    return subscription
}

func _synchronized_subscribe -> Disposable  {
    ......
    ......
    let key = self._observers.insert(observer.on)
    return SubscriptionDisposable(owner: self, key: key)
}
  • self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显在合适的地方一次性全部执行。其中也返回这次订阅的销毁者,方便执行之后的工作:synchronizedUnsubscribe->self._observers.removeKey(disposeKey)。
mutating func removeKey(_ key: BagKey) -> T? {
    if _key0 == key {
        _key0 = nil
        let value = _value0!
        _value0 = nil
        return value
    }

    if let existingObject = _dictionary?.removeValue(forKey: key) {
        return existingObject
    }

    for i in 0 ..< _pairs.count where _pairs[i].key == key {
        let value = _pairs[i].value
        _pairs.remove(at: i)
        return value
    }
    return nil
}
  • 遍历通过 key 获取响应 bag 中的 value,执行集合移除,因为没有相应持有关系,达到自动释放销毁。

③ 发送信号流程

public func on(_ event: Event<Element>) {
	dispatch(self._synchronized_on(event), event)
}
  • 这里调用了 dispatch 函数,传了两个参数:self._synchronized_on(event) 和 event,查看 dispatch 函数源码:
func dispatch<E>(_ bag: Bag) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
  • bag._value0?(event) 首先执行事件的回调,然后判断 bag._onlyFastPath 的情况,默认会开启快速通道,如果是开启慢速通道,需要从刚刚添加进 bag 包里面的匹配,挨个进行 pairs[i].value(event) 外界事件回调,然后拿回外界封装的闭包的闭包调用 :element(event)。
func _synchronized_on(_ event: Event<E>) -> Observers {
    self._lock.lock(); defer { self._lock.unlock() }
    switch event {
    case .next:
        if self._isDisposed || self._stopped {
            return Observers()
        }
        
        return self._observers
    case .completed, .error:
        if self._stoppedEvent == nil {
            self._stoppedEvent = event
            self._stopped = true
            let observers = self._observers
            self._observers.removeAll()
            return observers
        }

        return Observers()
    }
}
  • 如果 self._isDisposed || self._stopped 成立,就会返回一个空的集合,也就没有序列的响应。在 .completed, .error 都会改变状态 self._stopped = true,也就是说序列完成或者错误之后都无法再次响应。在.completed, .error 还会移除添加在集合里面的内容。
  • Subject 流程图如下(Subject 把订阅流程和响应流程都内部实现,所以没有必要引入 sink):

④ Subject 对比

  • PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、Variable,它们之间既有各自的特点,也有相同之处:
    • 首先它们都是 Observable,它们的订阅者都能收到它们发出的新的 Event;
    • 直到 Subject 发出 .complete 或者 .error 的 Event 后,该 Subject 便终结,同时它也就不会再发出 .next 事件;
    • 对于那些在 Subject 终结后再订阅它的订阅者,也能收到 subject 发出的一条 .complete 或 .error 的 event,告诉新的订阅者它已经终结。
  • 它们之间最大的区别只是在于:当一个新的订阅者刚订阅它的时候,能不能收到 Subject 以前发出过的旧 Event,如果能的话又能收到多少个 Event。

以上是关于RxSwift之深入解析Subject的使用和实现原理的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift之深入解析dispose源码的实现

RxSwift之深入解析map操作符的底层实现

RxSwift之深入解析特殊序列deallocating与deallocated的源码实现

RxSwift之深入解析Using操作的应用和原理

RxSwift之深入解析核心逻辑Observable的底层原理

RxSwift之深入解析URLSession的数据请求和数据处理