RxSwift之深入解析Observable序列的创建

Posted Forever_wj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxSwift之深入解析Observable序列的创建相关的知识,希望对你有一定的参考价值。

一、emty

  • 创建一个空的序列:本来序列事件是 Int 类型的,调用 emty 函数没有序列,只能complete:
	let emtyOb = Observable<Int>.empty()
	_ = emtyOb.subscribe(onNext: { (number) in
	    print("订阅:",number)
	}, onError: { (error) in
	    print("error:",error)
	}, onCompleted: {
	    print("完成回调")
	}) {
	    print("释放回调")
	}
  • 这种方式不常用,但是以点及面展开分析,通过源码解析查看:
	override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
	    observer.on(.completed)
	    return Disposables.create()
	}
  • 很明显在订阅的时候,直接 observer.on(.completed) 发送了完成信号,非常简洁:

二、just

  • 单个信号序列创建,该方法通过传入一个默认值来初始化,构建一个只有一个元素的 Observable 队列,订阅完信息自动 complete。
  • 如下所示,显示地标注出了 Observable 的类型为 Observable<[String]>,即指定了 Observable 所发出的事件携带的数据类型必须是String 类型的:
	// 单个信号序列创建
	let array = ["A","B"]
	Observable<[String]>.just(array)
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
	
	_ = Observable<[String]>.just(array).subscribe(onNext: { (number) in
	    print("订阅:",number)
	}, onError: { (error) in
	    print("error:",error)
	}, onCompleted: {
	    print("完成回调")
	}) {
	    print("释放回调")
	}
  • just 序列在平时开发里面还是应用挺多的,底层源码如下:
	override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
	    observer.on(.next(self._element))
	    observer.on(.completed)
	    return Disposables.create()
	}
  • observer.on(.next(self._element)) 常规订阅之后就会发送 .next 事件,之后就会自动发送完成事件:

三、of

  • 此方法创建一个新的可观察实例,该实例具有可变数量的元素。
  • 该方法可以接受可变数量的参数(必需要是同类型的):
	// 多个元素 - 针对序列处理
	Observable<String>.of("A","B")
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
	
	// 字典
	Observable<[String: Any]>.of(["name":"ydw","age":18])
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
	
	// 数组
	Observable<[String]>.of(["A","B"])
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
  • 无论字典、数组、多个元素都可以正常使用,初始化保存调度环境和传入的元素
    订阅流程也是利用 sink,然后通过 mutableIterator 迭代器处理发送:

四、from

  • 将可选序列转换为可观察序列。从集合中获取序列:数组、集合、set 获取序列,有可选项处理,更安全。
  • 从一个序列 (如Array/Dictionary/Set) 中创建一个 Observable 序列:
	Observable<[String]>.from(optional: ["A","B"])
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
  • self._optional = optional 底层初始化可选项保存,订阅流程判断是否匹配我可选项;
  • 发送 observer.on(.next(element)) 序列 ,随即自动 observer.on(.completed) 完成序列发送。

五、deferred

  • 返回一个可观察序列,该序列在新观察者订阅时调用指定的工厂函数。
  • 使用 deferred() 方法延迟 Observable 序列的初始化,通过传入的 block 来实现 Observable 序列的初始化并且返回。
	var isOdd = true
	_ = Observable<Int>.deferred { () -> Observable<Int> in
	    isOdd = !isOdd
	    if isOdd {
	        return Observable.of(1,3,5,7,9)
	    }
	    return Observable.of(0,2,4,6,8)
	    }
	    .subscribe { (event) in
	        print(event)
	    }
  • self._observableFactory = observableFactory 初始化保存工厂闭包:
	func run() -> Disposable {
	    do {
	        let result = try self._observableFactory()
	        return result.subscribe(self)
	    }
	    catch let e {
	        self.forwardOn(.error(e))
	        self.dispose()
	        return Disposables.create()
	    }
	}
  • 在订阅流程到 sink 的时候,执行工厂闭包,有种中间层被包装的感觉:

六、rang

  • 使用指定的调度程序生成并发送观察者消息,生成指定范围内的可观察整数序列。
  • 创建一个 Observable 序列,它会发出一系列连续的整数,然后终止。
	Observable.range(start: 2, count: 5)
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
	
	// 底层源码
	init(start: E, count: E, scheduler: ImmediateSchedulerType) {
	    self._start = start
	    self._count = count
	    self._scheduler = scheduler
	}
	
	override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
	    let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
	    let subscription = sink.run()
	    return (sink: sink, subscription: subscription)
	}
  • 保存序列中第一个整数的值,要生成的顺序整数的数目以及调度环境。
	if i < self._parent._count {
	    self.forwardOn(.next(self._parent._start + i))
	    recurse(i + 1)
	}
	else {
	    self.forwardOn(.completed)
	    self.dispose()
	}
  • 根据之前保存的信息,数据的状态也不断攀升,然后递归到规定的要求:

七、generate

  • 通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。
  • 创建一个 Observable 序列,只要提供的条件值为 true 就可以生成值。
  • 初始值给定,然后判断条件 1 再判断条件2 会一直递归下去,直到条件 1 或者条件 2 不满足,类似数组遍历循环:
    • 参数一 initialState: 初始状态;
    • 参数二 condition:终止生成的条件(返回“false”时);
    • 参数三 iterate:迭代步骤函数;
    • 参数四 调度器:用来运行生成器循环的调度器,默认:CurrentThreadScheduler.instance;
    • 返回:生成的序列。
	Observable.generate(initialState: 0,// 初始值
	                    condition: { $0 < 10}, // 条件1
	                    iterate: { $0 + 2 })  // 条件2 +2
	    .subscribe { (event) in
	        print(event)
	    }.disposed(by: disposeBag)
	
	// 数组遍历
	let arr = ["A","B"]
	Observable.generate(initialState: 0,// 初始值
	    condition: { $0 < arr.count}, // 条件1
	    iterate: { $0 + 1 })  // 条件2 + 2
	    .subscribe(onNext: {
	        print("遍历arr:",arr[$0])
	    })
	    .disposed(by: disposeBag)

八、timer

  • 获取计时器 Observable 序列,返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值:
    • 第一次参数:第一次响应距离现在的时间;
    • 第二个参数:时间间隔;
    • 第三个参数:线程。
	Observable<Int>.timer(5, period: 2, scheduler: MainScheduler.instance)
	    .subscribe { (event) in
	        print(event)
	    }
	.disposed(by: disposeBag)
	
	// 因为没有指定期限period,故认定为一次性
	Observable<Int>.timer(1, scheduler: MainScheduler.instance)
	    .subscribe { (event) in
	        print("\\(event)")
	    }
	    .disposed(by: disposeBag)
  • 状态码的不断攀升,间隔时间不断发送响应:

九、interval

  • 返回一个可观察序列,该序列在每个周期之后生成一个值,使用指定的调度程序运行计时器并发送观察者消息:
	// 定时器
	Observable<Int>.interval(1, scheduler: MainScheduler.instance)
	    .subscribe { (event) in
	        print(event)
	    }
	    .disposed(by: disposeBag)

十、repeatElement

  • 使用指定的调度程序发送观察者消息,生成无限重复给定元素的可观察序列:
	Observable<Int>.repeatElement(5)
	    .subscribe { (event) in
	        print("订阅:",event)
	    }
	    .disposed(by: disposeBag)

十一、error

  • 创建一个不会发送任何条目并且立即终止错误的 Observable 序列,返回一个以“error”结束的可观察序列。
  • error 序列平时在开发也比较常见,请求网络失败也会发送失败信号:
	// 对消费者发出一个错误信号
	Observable<String>.error(NSError.init(domain: "dwerror", code: 10086, userInfo: ["reason":"unknow"]))
	    .subscribe { (event) in
	        print("订阅:",event)
	    }
	    .disposed(by: disposeBag)

十二、never

  • never 返回一个以“error”结束的可观察序列,这个序列平时在开发也比较常见,请求网络失败也会发送失败信号:
	Observable<String>.never()
	    .subscribe { (event) in
	        print("never:",event)
	    }
	    .disposed(by: disposeBag)

十三、create()

  • 该方法接受一个 闭包形式的参数,任务是对每一个过来的订阅进行处理。这也是序列创建的一般方式,应用非常之多:
	let observable = Observable<String>.create{observer in
	    // 对订阅者发出了.next事件,且携带了一个数据"ydw"
	    observer.onNext("ydw")
	    // 对订阅者发出了.completed事件
	    observer.onCompleted()
	    // 因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable
	    return Disposables.create()
	}
	 
	// 订阅测试
	observable.subscribe {
	    print($0)
	}

以上是关于RxSwift之深入解析Observable序列的创建的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift之深入解析场景特征序列的使用和底层实现

RxSwift之深入解析map操作符的底层实现

RxSwift之深入解析Subject的使用和实现原理

RxSwift之深入解析Using操作的应用和原理

RxSwift之深入解析特殊序列deallocating与deallocated的源码实现

RxSwift 从 Observable 序列中的一项获取值