RxSwift之深入解析Using操作的应用和原理
Posted ╰つ栺尖篴夢ゞ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析Using操作的应用和原理相关的知识,希望对你有一定的参考价值。
一、前言
- ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
- 即创建一个和 Observable 具有相同生命周期的 disposable 资源。
- 可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。
- 如下所示:
class MyDisposables: Disposable
func dispose()
print("dispose")
......
let _ = Observable
.using( () -> MyDisposables in
return MyDisposables()
) _ in
return Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.take(5)
.subscribe(onNext:
print($0)
)
- 执行结果:
0
1
2
3
4
dispose
- 可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。
二、监听 Obervable
- RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
let signingIn = ActivityIndicator()
self.signingIn = signingIn.asObservable()
let usernameAndPassword = Observable.combineLatest(input.username, input.password) ($0, $1)
signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
.flatMapLatest (username, password) in
return API.signup(username, password: password)
.observeOn(MainScheduler.instance)
.catchErrorJustReturn(false)
.trackActivity(signingIn)
.flatMapLatest loggedIn -> Observable<Bool> in
let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
return wireframe.promptFor(message, cancelAction: "OK", actions: [])
// propagate original value
.map _ in
loggedIn
.shareReplay(1)
- signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下:
-
- 按下登录按钮;
-
- 使用当前用户名及密码进行登录;
-
- 展示登录结果。
- Rx 可观察对象的交互如下:
- 上面的 GitHub 登录涉及到 Rx 的相关操作:
-
- combineLatest:合并最后的 username 和 password,形成一个新的 Observable;
-
- withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。
三、如何实现监听?
- 上面的 GitHub 登录涉及到记录开始登录的操作如下,那么它是如何监听到当前是否正在登录呢?
......
API.signup(username, password: password)
.observeOn(MainScheduler.instance)
.catchErrorJustReturn(false)
.trackActivity(signingIn)
......
public extension ObservableConvertibleType
public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<E>
return activityIndicator.trackActivityOfObservable(self)
- 可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?
- 最直白的想法应该如下:
-
- 设置当前状态为正在执行登录;
-
- 执行登录操作;
-
- 设置当前状态为没有执行登录。
- 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆操作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情:
-
- 保留登录动作 ObservableA,返回自定义的一个 ObservableB;
-
- 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录;
-
- 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA;
-
- 登录操作执行完毕后,设置当前状态为没有执行登录。
- signingIn 所属类 ActivityIndicator 的实现如下:
public class ActivityIndicator : DriverConvertibleType
public typealias E = Bool
private let _lock = NSRecursiveLock()
private let _variable = Variable(0)
private let _loading: Driver<Bool>
public init()
_loading = _variable.asDriver()
.map $0 > 0
.distinctUntilChanged()
fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E>
return Observable.using( () -> ActivityToken<O.E> in
self.increment()
return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
) t in
return t.asObservable()
private func increment()
_lock.lock()
_variable.value = _variable.value + 1
_lock.unlock()
private func decrement()
_lock.lock()
_variable.value = _variable.value - 1
_lock.unlock()
public func asDriver() -> Driver<E>
return _loading
- _variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。
- Variable 的示例如下:
let v = Variable(0)
v.asObservable()
.subscribe(onNext:
print($0)
)
v.value = 1
v.value = 2
- 执行结果:
0
1
2
- 继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading = _variable.asDriver()
.map $0 > 0
.distinctUntilChanged()
- 其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。
- 重点还是在 trackActivityOfObservable 方法:
fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E>
return Observable.using( () -> ActivityToken<O.E> in
self.increment()
return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
) t in
return t.asObservable()
- 对应的 resourceFactory 如下:
() -> ActivityToken<O.E> in
self.increment()
return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
- observableFactory 如下:
t in
return t.asObservable()
- ActivityToken 的实现如下:
private struct ActivityToken<E> : ObservableConvertibleType, Disposable
private let _source: Observable<E>
private let _dispose: Cancelable
init(source: Observable<E>, disposeAction: @escaping () -> ())
_source = source
_dispose = Disposables.create(with: disposeAction)
func dispose()
_dispose.dispose()
func asObservable() -> Observable<E>
return _source
- 可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下操作:
-
- 增加当前正在执行的动作数;
-
- 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。
- 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的操作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。
- 监听当前是否正在登录就是通过 using 操作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。
四、using 内部实现
- using 的内部实现:
public static func using<R: Disposable>(_ resourceFactory: @escaping () throws -> R, observableFactory: @escaping (R) throws -> Observable<E>) -> Observable<E>
return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
- using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
class Using<SourceType, ResourceType: Disposable>: Producer<SourceType>
typealias E = SourceType
typealias ResourceFactory = () throws -> ResourceType
typealias ObservableFactory = (ResourceType) throws -> Observable<SourceType>
fileprivate let _resourceFactory: ResourceFactory
fileprivate let _observableFactory: ObservableFactory
init(resourceFactory: @escaping ResourceFactory, observableFactory: @escaping ObservableFactory)
_resourceFactory = resourceFactory
_observableFactory = observableFactory
override func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == E
let sink = UsingSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
- Producer 的实现如下:
class Producer<Element> : Observable<Element>
override init()
super.init()
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element
if !CurrentThreadScheduler.isScheduleRequired
return run(observer)
else
return CurrentThreadScheduler.instance.schedule(()) _ in
return self.run(observer)
func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element
abstractMethod()
- Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType> : Sink<O>, ObserverType where O.E == SourceType
typealias Parent = Using<SourceType, ResourceType>
typealias E = O.E
private let _parent: Parent
init(parent: Parent, observer: O)
_parent = parent
super.init(observer: observer)
func run() -> Disposable
var disposable = Disposables.create()
do
let resource = try _parent._resourceFactory()
disposable = resource
let source = try _parent._observableFactory(resource)
return Disposables.create(
source.subscribe(self),
disposable
)
catch let error
return Disposables.create(
Observable.error(error).subscribe(self),
disposable
)
func on(_ event: Event<E>)
switch event
case let .next(value):
forwardOn(.next(value))
case let .error(error):
forwardOn(.error(error))
dispose()
case .completed:
forwardOn(.completed)
dispose()
- 可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。
- 实际上 UsingSink 只是在 run 中做了两件特殊的事情:
-
- 在让 source 订阅自身前,创建 resource(一般会在这里做额外的操作);
-
- 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的操作比如 map、flatMap 等都是由上游给的)。
以上是关于RxSwift之深入解析Using操作的应用和原理的主要内容,如果未能解决你的问题,请参考以下文章
RxSwift之深入解析核心逻辑Observable的底层原理