RxSwift之深入解析Using操作的应用和原理

Posted ╰つ栺尖篴夢ゞ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析Using操作的应用和原理相关的知识,希望对你有一定的参考价值。

一、前言

create a disposable resource that has the same lifespan as the Observable
  • 即创建一个和 Observable 具有相同生命周期的 disposable 资源。

  • 可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。
  • 如下所示:
class MyDisposables: Disposable 
    func dispose() 
        print("dispose")
    


......

let _ = Observable
    .using( () -> MyDisposables in
        return MyDisposables()
    )  _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    
    .subscribe(onNext: 
        print($0)
    )
  • 执行结果:
0
1
2
3
4
dispose
  • 可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。

二、监听 Obervable

  • RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
let signingIn = ActivityIndicator()
self.signingIn = signingIn.asObservable()

let usernameAndPassword = Observable.combineLatest(input.username, input.password)  ($0, $1) 

signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
    .flatMapLatest  (username, password) in
        return API.signup(username, password: password)
            .observeOn(MainScheduler.instance)
            .catchErrorJustReturn(false)
            .trackActivity(signingIn)
    
    .flatMapLatest  loggedIn -> Observable<Bool> in
        let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
        return wireframe.promptFor(message, cancelAction: "OK", actions: [])
            // propagate original value
            .map  _ in
                loggedIn
            
    
    .shareReplay(1)
  • signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下:
    • 按下登录按钮;
    • 使用当前用户名及密码进行登录;
    • 展示登录结果。
  • Rx 可观察对象的交互如下:

  • 上面的 GitHub 登录涉及到 Rx 的相关操作:
    • combineLatest:合并最后的 username 和 password,形成一个新的 Observable;
    • withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。

三、如何实现监听?

  • 上面的 GitHub 登录涉及到记录开始登录的操作如下,那么它是如何监听到当前是否正在登录呢?
......

API.signup(username, password: password)
.observeOn(MainScheduler.instance)
.catchErrorJustReturn(false)
.trackActivity(signingIn)

......

public extension ObservableConvertibleType 
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<E> 
        return activityIndicator.trackActivityOfObservable(self)
    

  • 可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?
  • 最直白的想法应该如下:
    • 设置当前状态为正在执行登录;
    • 执行登录操作;
    • 设置当前状态为没有执行登录。
  • 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆操作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情:
    • 保留登录动作 ObservableA,返回自定义的一个 ObservableB;
    • 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录;
    • 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA;
    • 登录操作执行完毕后,设置当前状态为没有执行登录。
  • signingIn 所属类 ActivityIndicator 的实现如下:
public class ActivityIndicator : DriverConvertibleType 
    public typealias E = Bool

    private let _lock = NSRecursiveLock()
    private let _variable = Variable(0)
    private let _loading: Driver<Bool>

    public init() 
        _loading = _variable.asDriver()
            .map  $0 > 0 
            .distinctUntilChanged()
    

    fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> 
        return Observable.using( () -> ActivityToken<O.E> in
            self.increment()
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        )  t in
            return t.asObservable()
        
    

    private func increment() 
        _lock.lock()
        _variable.value = _variable.value + 1
        _lock.unlock()
    

    private func decrement() 
        _lock.lock()
        _variable.value = _variable.value - 1
        _lock.unlock()
    

    public func asDriver() -> Driver<E> 
        return _loading
    

  • _variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。
  • Variable 的示例如下:
let v = Variable(0)
v.asObservable()
    .subscribe(onNext: 
        print($0)
    )

v.value = 1
v.value = 2
  • 执行结果:
0
1
2
  • 继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading = _variable.asDriver()
	.map  $0 > 0 
	.distinctUntilChanged()
  • 其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。
  • 重点还是在 trackActivityOfObservable 方法:
fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> 
    return Observable.using( () -> ActivityToken<O.E> in
        self.increment()
        return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
    )  t in
        return t.asObservable()
    

  • 对应的 resourceFactory 如下:
 () -> ActivityToken<O.E> in
        self.increment()
        return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
    
  • observableFactory 如下:
 t in
        return t.asObservable()
    
  • ActivityToken 的实现如下:
private struct ActivityToken<E> : ObservableConvertibleType, Disposable 
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping () -> ()) 
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    

    func dispose() 
        _dispose.dispose()
    

    func asObservable() -> Observable<E> 
        return _source
    

  • 可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下操作:
    • 增加当前正在执行的动作数;
    • 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。
  • 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的操作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。
  • 监听当前是否正在登录就是通过 using 操作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。

四、using 内部实现

  • using 的内部实现:
public static func using<R: Disposable>(_ resourceFactory: @escaping () throws -> R, observableFactory: @escaping (R) throws -> Observable<E>) -> Observable<E> 
        return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    
  • using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
class Using<SourceType, ResourceType: Disposable>: Producer<SourceType> 
    
    typealias E = SourceType
    
    typealias ResourceFactory = () throws -> ResourceType
    typealias ObservableFactory = (ResourceType) throws -> Observable<SourceType>
    
    fileprivate let _resourceFactory: ResourceFactory
    fileprivate let _observableFactory: ObservableFactory
    
    
    init(resourceFactory: @escaping ResourceFactory, observableFactory: @escaping ObservableFactory) 
        _resourceFactory = resourceFactory
        _observableFactory = observableFactory
    
    
    override func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == E 
        let sink = UsingSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    

  • Producer 的实现如下:
class Producer<Element> : Observable<Element> 
    override init() 
        super.init()
    
    
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element 
        if !CurrentThreadScheduler.isScheduleRequired 
            return run(observer)
        
        else 
            return CurrentThreadScheduler.instance.schedule(())  _ in
                return self.run(observer)
            
        
    
    
    func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element 
        abstractMethod()
    

  • Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType> : Sink<O>, ObserverType where O.E == SourceType 

    typealias Parent = Using<SourceType, ResourceType>
    typealias E = O.E

    private let _parent: Parent
    
    init(parent: Parent, observer: O) 
        _parent = parent
        super.init(observer: observer)
    
    
    func run() -> Disposable 
        var disposable = Disposables.create()
        
        do 
            let resource = try _parent._resourceFactory()
            disposable = resource
            let source = try _parent._observableFactory(resource)
            
            return Disposables.create(
                source.subscribe(self),
                disposable
            )
         catch let error 
            return Disposables.create(
                Observable.error(error).subscribe(self),
                disposable
            )
        
    
    
    func on(_ event: Event<E>) 
        switch event 
        case let .next(value):
            forwardOn(.next(value))
        case let .error(error):
            forwardOn(.error(error))
            dispose()
        case .completed:
            forwardOn(.completed)
            dispose()
        
    

  • 可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。
  • 实际上 UsingSink 只是在 run 中做了两件特殊的事情:
    • 在让 source 订阅自身前,创建 resource(一般会在这里做额外的操作);
    • 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的操作比如 map、flatMap 等都是由上游给的)。

以上是关于RxSwift之深入解析Using操作的应用和原理的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift之深入解析核心逻辑Observable的底层原理

RxSwift之深入解析map操作符的底层实现

RxSwift之深入解析dispose源码的实现

RxSwift之深入解析场景特征序列的使用和底层实现

RxSwift之深入解析URLSession的数据请求和数据处理

RxSwift之深入解析Observable序列的创建