ReactiveX序列——RxSwift
Posted Jarlene
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReactiveX序列——RxSwift相关的知识,希望对你有一定的参考价值。
ReactiveX序列——RxSwift
从本篇博客开始,我将叙述一序列ReactiveX模式的讲解,ReactiveX是微软推出的开源一个项目,里面包含了RxJava,RxJs,RxSwift,RxCpp,Rx.Net,Rxphp等一序列的Functional Reactive Programming(FRP,函数响应式编程)。从这篇博客开始我将详细介绍以上提到的6个语言RX的FRP,及其内部具体实现。
Swift是苹果公司新推出的一门现代化的编程语言,并且将其开源出来了,Swift具有很多的优点,这也使得这门语言推出的短时间引起了很大反应的原因,在最近的2016年3月的编程语言排行榜处于第14位,甚至超过了OC(15位)。可见Swift的在开发者心中的地位。
RxSwift的观察者对象(Observable)
在RxSwift中,可以有多种创建Observable对象的方法,主要有以下几种:
- asObservable
- create
- deferred
- empty
- error
- toObservable/from
- interval
- never
- just
- of
- range
- repeatElement
- timer
要弄明白Observable就要先弄清楚Observable是继承了哪些类和协议,从源码开始分析:
首先第一个是ObservableConvertibleType:
/**
Type that can be converted to observable sequence (`Observer<E>`).
*/
public protocol ObservableConvertibleType {
/**
Type of elements in sequence.
*/
typealias E
/**
Converts `self` to `Observable` sequence.
- returns: Observable sequence that represents `self`.
*/
func asObservable() -> Observable<E>
}
从ObservableConvertibleType协议源码可以看出,它定义了一个typealias类型别名和asObservable方法,类型别名是用来定义将要处理的类型(例如String,Int等等),而asObervable这个我们在后面会具体叙述。其次是ObservableType,它继承了ObservableConvertibleType,ObservableType主要干了两个事情,第一个是创建出subscribe方法,它是用来执行订阅事件的(onNext、onError/onComplete),第二个就是简易实现asObservable方法(通过extension ObservableType 实现),asObservable主要是通过Observable.create(subscrible())实现的。再上来就是Observable,它是一个类,继承了ObservableType协议接口。
下面我们分别对以上几种创建Observable对象做详细的介绍。
- asObservable方法:
asObservable其实是相当于clone方法,其内部实现如下:
public func asObservable() -> Observable<E> {
return self
}
从这里看,它return self也就是自己,这就意味着,你必须先有Observable对象才能调用asObservable方法。例如:
var obs = Observable<String>.create { (observer) -> Disposable in
observer.on(.Next("hahah"))
observer.on(.Next("deasd"))
observer.on(.Completed)
return NopDisposable.instance
}
let observable = obs.asObservable()
observable.subscribeOn(MainScheduler.instance)
.subscribe{ event in
print(event.debugDescription)
}
第二个是subscribe方法,这个方法具体实现调用了一个“抽象”方法,这个“抽象”方法就是打印出来一个错误日志并且停止运行。
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
abstractMethod()
}
当然,这个Observable类中方法,但是extension Observable其实是有很多用法的。这也是我们上面提到创建Observable的各种方法。
2、create方法
public static func create(subscribe: (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
这是一个“静态方法”(在class中用static关键字标注,在struct和enum中使用class关键字标注),这个方法的参数是一个函数(通常我们会用闭包的方式),函数的参数是AnyObserver,返回的是Disposable。AnyObserver其实就是订阅者,Disposable是一个协议接口,里面只有一个dispose方法,用来释放一些资源。整个create方法返回的是一个AnonymousObservable(匿名Observable),AnonymousObservable继承自Producer,Producer实现了线程调度功能,可以安排某个线程来执行run方法。因此create方法返回的AnonymousObservable是可以运行在指定线程中Observable。完整的create例子:
var obs = Observable<String>
.create ({ (observer) -> Disposable in
observer.on(.Next("hahah"))
observer.on(.Next("deasd"))
observer.on(.Completed)
return NopDisposable.instance
})
.observeOn(MainScheduler.instance)
.subscribe({event in
if let str = event.element {
print(str)
}
})
//.dispose()
最后obs变量是一个Disposable类型变量,可以继续调用dispose方法释放资源。整个代码输出结果:
hahah
deasd
3、empty方法
public static func empty() -> Observable<E> {
return Empty<E>()
}
empty方法是一个空方法,里面没有onNext事件处理,只会处理onComplete方法。empty创建Observable对象比较简单。代码例子:
let obs1 = Observable<String>.empty()
obs1.subscribe(
onNext: {str in print(str)},
onError: { (errorType) -> Void in
print(errorType)
},
onCompleted: { () -> Void in
print("complete")
})
{ () -> Void in
print("dispose")
}
输出结果:
complete
dispose
这个例子中有四个闭包,其中最后一个是尾随闭包,而且这些闭包都是可选类型。当然你也可以如下写法:
let obs1 = Observable<String>.empty()
obs1.subscribe(
onNext: {str in
print(str)
},
onError: { (errorType) -> Void in
print(errorType)
},
onCompleted: { () -> Void in
print("complete")
},
onDisposed: {() -> Void in
print("dispose")
})
4、never方法
public static func never() -> Observable<E> {
return Never()
}
官方解释是返回一个无终止的观察者事件序列,可以用来表示无限持续时间。尽管我们给安排了next事件,但实际上,他是不会执行的。不会输出onNext
Observable<String>
.never()
.subscribeNext( { (str) -> Void in
print("onNext")
})
//.dispose()
5、just方法
public static func just(element: E, scheduler: ImmediateSchedulerType) -> Observable<E> {
return JustScheduled(element: element, scheduler: scheduler)
}
just方法只能处理单个事件,简单来说,我们使用just方法不能将一组数据一起处理,只能一个一个处理。例如:
Observable<String>
.just("just test")
.subscribeOn(MainScheduler.instance)
.subscribeNext({ (str) -> Void in
print(str)
})
.dispose()
输出结果:
just test
just方法是一个多态方法,允许在传入参数时候指定线程,例如:
它指定当前线程完成subscribe相关事件。
Observable<String>
.just("just with Scheduler", scheduler: CurrentThreadScheduler.instance)
.subscribeNext({ (str) -> Void in
print(str)
})
.dispose()
6、error方法
public static func error(error: ErrorType) -> Observable<E> {
return Error(error: error)
}
error方法是返回一个只能调用onError方法的Observable序列。其中的onNext和OnComleted方法是不会执行的。例如:
public static func error(error: ErrorType) -> Observable<E> {
return Error(error: error)
}
Observable<String>
.error(RxError.Timeout)
.subscribe(
onNext: { (str) -> Void in
print(str)
print("onNext")
},
onError: { (error)-> Void in
print(error)
},
onCompleted: { () -> Void in
print("onCompleted")
},
onDisposed: { () -> Void in
print("dispose")
})
.dispose()
最后的输出结果是:
Sequence timeout
dispose
7、of方法
可以说of方法是just方法的升级版,它可以将一序列的事情组合起来一起处理。极大方便了开发者对数组(Array)、字典(Dictionary)进行分布处理。
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> {
return Sequence(elements: elements, scheduler: scheduler)
}
Observable<String>
.of("d1","d2", "d3", "d4")
.subscribe( { (event) -> Void in
if let els = event.element {
print(els)
}
})
.dispose()
这里解释一下subscribe(on: Event->Void)方法,例子中event.element在调用get属性的时候其实会执行一个onNext方法,它返回的是一个可选类型,因此要用if let解析处理。当然如果代码改成如下,那么是不会输出结果的,因为event.error执行的是错误监听(也就是执行的onError方法,因此不会输出结果)。of和just一样,存在一个多态方法,可以带入线程控制。
Observable<String>
.of("d1","d2", "d3", "d4")
.subscribe( { (event) -> Void in
if let els = event.error{
print(els)
}
})
.dispose()
8、deferred方法
deferred方法是延时创建Observable对象,当subscribe的时候才去创建,它为每一个bserver创建一个新的Observable; deferred采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现。
public static func deferred(observableFactory: () throws -> Observable<E>)
-> Observable<E> {
return Deferred(observableFactory: observableFactory)
}
整个deferred方法的原理如上图,从图中可以看出,deferred不是第一步创建Observable,而是在subscriber的时候创建的。(图中红色的是error,绿色的是next事件)
9、generate方法
public static func generate(initialState initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
generate方法是一个迭代器,它一直循环onNext事件,直到condition不满足要求退出。generate有四个参数,第一个是最开始的循环变量,第二个是条件,第三个是迭代器,这个迭代器每次运行都会返回一个E类型,作为下一次是否执行onNext事件源,而是否正的要执行则看是否满足condition条件。其实我们可以理解generate就是一个循环体(其内部实现也正是一个循环,代码在:GenerateSink的run方法中)。例子:
Observable<String>
.generate(
initialState: "ah",
condition: ({ (str) -> Bool in
return str.hasPrefix("ah")
}),
iterate: ({ (str1) -> String in
return "h" + str1
}))
//.subscribeOn(MainScheduler.instance)
.subscribe ({ (event) -> Void in
if let res = event.element {
print(res)
}
})
.dispose()
输出结果:
ah
上面这个例子说的是,初始的变量是“ah”,第一个条件满足,执行onNext事件,同时生成一个hah,不满足条件,不执行onNext事件。generate是一个具有高度可变的of方法,它同时兼备了后面要介绍的过滤(filter)特性。当然generate还有一个多态方法,允许传入执行线程。这个线程是为循环体而生的,并不是为subscrible而生的。
10、repeatElement方法
public static func repeatElement(element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RepeatElement(element: element, scheduler: scheduler)
}
repeatElement方法是一个无限循环的,它会一直循环onNext方法。当然这种循环是可以指定线程的。例子:
Observable<String>
.repeatElement("daa")
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
其中subscribeNext是一个尾随闭包。
11、using方法
public static func using<R: Disposable>(resourceFactory: () throws -> R, observableFactory: R throws -> Observable<E>) -> Observable<E> {
return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
using方法是通过Factory方法生成一个对象(resourceFactory)再转换成Observable,中间我们要使用Factory方法,上面已经介绍过一次Factory方法。using方法相对其他的方法比较复杂和特殊,原因是using方法是由我们构建出资源和构建清除资源的,中间通过一个转换生成Observable对象。
Observable<String>
.using( { () -> Student<String> in
return Student(source: Observable<String>.just("jarlene"), disposeAction: { () -> () in
print("hah")
})
},
observableFactory: { (r) -> Observable<String> in
return r.asObservable()
})
.subscribeNext( { (ss) -> Void in
print(ss)
})
.dispose()
其中Student类继承了两个协议:ObservableConvertibleType和Disposable;ObservableConvertibleType是为了生成Observable对象(通过调用asObservable方法),Disposable是为了清除资源。源码如下:
class Student<E>: ObservableConvertibleType, Disposable{
private let _source: Observable<E>
private let _dispose: AnonymousDisposable
init(source: Observable<E>, disposeAction: () -> ()) {
_source = source
_dispose = AnonymousDisposable(disposeAction)
}
func dispose() {
_dispose.dispose()
}
func asObservable() -> Observable<E> {
return _source
}
var name :String{
get {
return self.name
}
set {
self.name = newValue
}
}
}
在上面例子中,我们采用Observable.just方法生成了一个Observable对象传递给Student对象,同时也定义了一个释放资源的方法。等到调用dispose()方法,就会执行我们定义的释放资源的方法。例子结果为:
jarlene
hah
12、range方法
public static func range(start start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}
range方法其实方便版of方法,其功能和of差不多,我们只要输出start和count然后就能生成一组数据,让他们执行onNext。值得注意的是,range方法只生成Observable型。在调用bindNext的时候可以将其对应成其他相应的类型。
例如:
let arr: [String] = ["ad", "cd", "ef", "gh"]
Observable<Int>
.range(start: 1, count: 3)
.subscribeNext { (n) -> Void in
print(arr[n])
}
.dispose()
结果
cd
ef
gh
13、toObservable(from)
public func toObservable(scheduler: ImmediateSchedulerType? = nil) -> Observable<Generator.Element> {
return Sequence(elements: self, scheduler: scheduler)
}
toObservable方法是扩展自Array,是将一个一个array转换成Observable,其内部实调用了一个序列Sequence,其用法很简单。
let arr: [String] = ["ab", "cd", "ef", "gh"]
arr.toObservable()
.subscribeNext { (s) -> Void in
print(s)
}
.dispose()
运行结果:
ab
cd
ef
gh
14、interval/timer
public static func interval(period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Timer(dueTime: period,
period: period,
scheduler: scheduler
)
}
public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
interval方法是定时产生一个序列,interval第一个参数就是时间间隔,第二个参数是指定线程。 可以看出interval是range和repeateElement的结合。timer方法和interval方法类似。差别在于timer可以设置间隔时间和持续时间,而interval的间隔时间和持续时间是一样的。
至此,我们将Observable对象基本的产生方法都讲述完了,下一节开始我们详细讲述Observer的创建以及制作器Producer,其次将详细叙述Producer和事件方法onNext、onError、onComplete之间的联系,以及Producer是怎么调度线程来完成线程控制的。
RxSwift的观察者对象的变换(Transform Observable)和过滤(Filter Observable)
对观察着对象进行变换使得一个对象变换成另一个对象,这个是RxSwift核心之一,因此对于熟悉RxSwift特别重要。RxSwift存在以下变换方法:
- buffer
- flatMap
- flatMapFirst
- flatMapLatest
- map
- scan
- window
过滤方法
- debounce / throttle
- distinctUntilChanged
- elementAt
- filter
- sample
- skip
- take
- takeLast
- single
下面我们分别对以上几种对Observable对象变换做详细的介绍(不全部阐述)。
1、 buffer方法:
buffer方法是extension ObservableType中的一个方法,它的作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。总体来说就是经过一定时间,将一定个数的事件组合成一个数组,一起处理,在组合的过程中,你可以选择线程。
public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<[E]> {
return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
例子:
Observable<String>
.of("ab", "cd", "ef", "gh")
.buffer(timeSpan: 1, count: 2, scheduler: MainScheduler.instance)
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
输出结果
["ab", "cd"]
["ef", "gh"]
2、flatMap
flatMap也是扩展自ObservableType,它的作用是将一种类型变换成另一种类型。flatMap的参数是一个方法,这个方法的输入参数与Observable的E是同一种类型,输出依然是Observable类型。
public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O)
-> Observable<O.E> {
return FlatMap(source: asObservable(), selector: selector)
}
我们看一个例子,例子中首先是一组Observable,通过flatMap后还是一组Observable,但是flatMap作用是,如果元素中遇到“a”字母开头的,那么它就重新组装一个数组,这个数组是只有元素和“a”;如果元素不是“a”字母开头的就与“b”字母组装成另一个数组。这两种情况都通过调用toObservable返回Observable。flatMapFirst、flatMapLast、flatMapWithIndex都是类似的作用,这里就不重复。
Observable<String>
.of("ab", "cd", "aef", "gh")
.flatMap({ (element: String) -> Observable<String> in
if element.hasPrefix("a") {
let sd : [String] = [element, "a"]
return sd.toObservable()
} else {
let sd : [String] = [element, "b"]
return sd.toObservable()
}
})
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
结果
ab
a
cd
b
aef
a
gh
b
3、map
map方法是通过其实flatMap的简化版本,它返回的可以是任何类型。其中R是返回类型。
public func map<R>(selector: Self.E throws -> R) -> RxSwift.Observable<R>
例子:
Observable<String>
.of("ab", "cd", "aef", "gh")
.map({ (str) -> String in
return str+"ss"
})
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
结果
abss
cdss
aefss
ghss
4、scan方法
scan方法有两个参数,第一个参数是种子,第二个参数是加速器。所谓的种子就是最初的状态,加速器就是将每一次运行的结果延续到下一次。scan方法也是扩展自ObservableType
public func scan<A>(seed: A, accumulator: (A, E) throws -> A)
-> Observable<A> {
return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator)
}
例子:
Observable<String>
.of("a", "b", "c", "d", "e")
.scan("s", accumulator: { (a, b) -> String in
return a+b
})
.subscribeNext({ (n) -> Void in
print(n)
})
.dispose()
这个例子中是将所有的字符依次串起来,运行结果是:
sa
sab
sabc
sabcd
sabcde
5、window
window方法同样扩展自ObservableType,它有三个参数,第一个是时间间隔,第二个是数量,第三个是线程。时间间隔指的的是window方法开窗的时间间隔;第二个参数数量指的的是每次通过窗口的个数;线程就是这种操作执行在什么线程上。起源码如下:
public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<Observable<E>> {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
需要特别注意的是window方法之后,返回的是Observable
Observable<String>
.of("ab", "bc", "cd", "de", "ef")
.window(timeSpan: 1, count: 2, scheduler: MainScheduler.instance)
.subscribeNext({ (n) -> Void in
n.subscribeNext({ (ss) -> Void in
print(ss)
})
})
.dispose()
结果:
ab
bc
cd
de
ef
变换的方法基本就这些,但是开发者可以通过自定义的方式扩展变换的方法以达到所需的目的。接下来我们看看过滤方法。
1、debounce / throttle
debounce/throttle 方法在规定的时间中过滤序列元素,就如上图描述的一样,当debounce打开的时候,刚好那个黄色的序列元素过来,那么它就不会通知到事件(onNext、onError、onComplete)上去。下面是debounce方法源码。
public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
例子:
Observable<String>
.of("a", "b", "c", "d", "e", "f")
.debounce(1, scheduler: MainScheduler.instance)
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
f
2、distinctUntilChanged
distinctUntilChanged 主要是过滤相邻两个元素是否重复,重复的话就过滤掉其中之一。
public func distinctUntilChanged<K>(keySelector: (E) throws -> K, comparer: (lhs: K, rhs: K) throws -> Bool)
-> Observable<E> {
return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer)
}
例子:
Observable<String>
.of("a", "a", "c", "e", "e", "f")
.distinctUntilChanged({ (lhs, rhs) -> Bool in
return lhs==rhs
})
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果:
a
c
e
f
3、elementAt
elementAt方法其实就挑选出所需要的序列元素,上图描述的很清楚。
这个方法很简单。没有什么难点。当index超界的时候,throwOnEmpty参数是否抛出异常。
public func elementAt(index: Int)
-> Observable<E> {
return ElementAt(source: asObservable(), index: index, throwOnEmpty: true)
}
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff")
.elementAt(2)
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
cs
4、filter
filter方法很简单,指出过滤条件就行,满足过滤条件的就能执行事件通知,否则不行
public func filter(predicate: (E) throws -> Bool)
-> Observable<E> {
return Filter(source: asObservable(), predicate: predicate)
}
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff")
.filter({ (ss) -> Bool in
return ss.hasPrefix("a")
})
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
aa
av
接下来的几个方法都是类似的,这里不就在详细叙述啦。
RxSwift的Observable事件处理以及线程调度
由第一部分可以知道,几乎在创建所有的Observable的时候都要用到Producer,而在事件处理(onNext、onError、onComplete)过程中经常要用到线程调度(Scheduler),它们之间存在一种很巧妙的设计。首先先看看Producer源码。
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if !CurrentThreadScheduler.isScheduleRequired {
return run(observer)
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
return self.run(observer)
}
}
}
func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
abstractMethod()
}
}
Producer是继承了Observable类,我们在创建Observable类时候都用到了Producer,那么Producer主要做了两件事情,第一个实现subscribe方法,在subscribe方法中传入了observer参数,observer类型是ObserverType,在上一部分介绍了ObserverType中有一个类型别名E,那么在Producer的范型element就必须和ObserverType中类型别名E一样。回过头来说subscribe,我们首先看CurrentThreadScheduler 的源码,CurrentThreadScheduler 是继承ImmediateSchedulerType协议,它里面就定义了一个方法:
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
而这个方法在CurrentThreadScheduler 具体实现了。
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()
static var queue : ScheduleQueue? {
get {
return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance)
}
set {
NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance)
}
}
/**
Gets a value that indicates whether the caller must call a `schedule` method.
*/
public static private(set) var isScheduleRequired: Bool {
get {
let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance)
return value == nil
}
set(isScheduleRequired) {
NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance)
}
}
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.disposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
其实主要是根据CurrentThreadScheduler.isScheduleRequired参数来选择是否需要当前线程运行,如果需要,首调用action方法,而这个action方法其实就是onNext、onError、onCompelete方法。然后做了一个延迟清除(defer)和一个判断(guard)。然后循环一个queue其实主要是看看是否还有没有执行完的onNext时间。latest.invoke()其实就是做action(state),然后返回Disposable。如果不需要,则组合queue,生成Disposable返回。接下来我们看看怎么设置线程执行的,首选看看subscribleOn方法,这个方法就是指定接下来事情要发生在那个线程中,具体看一下代码:
public func observeOn(scheduler: ImmediateSchedulerType)
-> Observable<E> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
方法是定义在extension ObservableType 中的,它指定ObservableType 运行线程,这里面指定了两种运行方式,第一种是运行ObserveOnSerialDispatchQueue,第二种是ObserveOn这两个都继承自Producer,上面我们已经叙述了Producer,不管是ObserveOnSerialDispatchQueue还是ObserveOn都重写了run方法,他们返回的都是ObserverBase。ObserverBase其实就是在执行onNext、onError、onComplete方法。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0
func on(event: Event<E>) {
switch event {
case .Next:
if _isStopped == 0 {
onCore(event)
}
case .Error, .Completed:
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
func onCore(event: Event<E>) {
abstractMethod()
}
func dispose() {
_isStopped = 1
}
}
onCore方法是由继承者实现,比如在ObserveOnSink类中及具体实现了onCore方法
使用 ReactiveX 进行渐进式图像加载