RxSwift 跟踪多个 observables 活动

Posted

技术标签:

【中文标题】RxSwift 跟踪多个 observables 活动【英文标题】:RxSwift track multiple observables activity 【发布时间】:2017-09-24 10:26:36 【问题描述】:

我有以下代码:

let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)
let categoriesRequestResult = api.subscribeArrayRequest(categoriesRequest, from: actionRequest, disposedBy: disposeBag)

let newCategories = categoriesRequestResult
    .map  $0.element 
    .filterNil()

let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)
let categoriesDatabaseResult = database.subscribeUpdates(from: categoriesUpdateData, disposedBy: disposeBag)

let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)
let latestTopicsRequestResult = api.subscribeArrayRequest(latestTopicsRequest, from: actionRequest, disposedBy: disposeBag)

let newLastTopics = latestTopicsRequestResult
    .map  $0.element 
    .filterNil()

let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)
let latestTopicsDatabaseResult = database.subscribeUpdates(from: latestTopicsUpdateData, disposedBy: disposeBag)

有两个请求从同一个发布主题 actionRequest 开始,并且在这些请求之后有两个数据库更新。

我需要类似 isActive 的布尔值,如果任何 api/数据库任务正在进行,它将返回 true。是否可以?我在 RxSwift 示例中看到了ActivityIndicator,但我不知道在我的情况下是否可以使用它。

如果需要,来自 api/数据库的代码:

// API
func subscribeArrayRequest<T>(_ request: MappableRequest<T>,
                              from triggerObservable: Observable<Void>,
                              disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> 
    let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)

    triggerObservable
        .flatMapLatest 
            SessionManager
                .jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
                .mapResponse(on: APIService.mappingSheduler)  Mapper<T>().mapArray(JSONArray: $0) 
                .materialize()
        
        .subscribe(onNext:  [weak result] event in
            result?.onNext(event)
        )
        .disposed(by: disposeBag)

    return result


// Database
func subscribeUpdates<N, P>(from data: UpdateData<N, P>, disposedBy disposeBag: DisposeBag) -> Observable<Void> 
    let result = PublishSubject<Void>()

    data.newObjectsObservable
        .observeOn(DatabaseService.writingSheduler)
        .subscribe(onNext:  [weak result] newObjects in
            // update db

            DispatchQueue.main.async 
                result?.onNext(())
            
        )
        .disposed(by: disposeBag)

    return result

谢谢。

【问题讨论】:

【参考方案1】:

找到以下解决方案:将ActivityIndicator 实例传递给服务方法并像这样使用它:

func bindArrayRequest<T>(_ request: MappableRequest<T>,
                        from triggerObservable: Observable<Void>,
                        trackedBy indicator: RxActivityIndicator,
                        disposedBy disposeBag: DisposeBag) -> Observable<Event<[T]>> 
    let result = ReplaySubject<Event<[T]>>.create(bufferSize: 1)

    triggerObservable
        .flatMapLatest 
            SessionManager
                .jsonArrayObservable(with: request.urlRequest, isSecured: request.isSecured)
                .mapResponse(on: APIService.mappingSheduler)  Mapper<T>().mapArray(JSONArray: $0) 
                .materialize()
                .trackActivity(indicator)
        
        .bind(to: result)
        .disposed(by: disposeBag)

    return result

还将数据库更新逻辑包装到 observable 中,并将其与 .trackActivity(indicator) 一起使用到 flatMapLatest 中,就像在 api 中一样。

然后像这样使用 api/database 方法:

let indicator = RxActivityIndicator()
isUpdatingData = indicator.asObservable() // needed bool observable

let categoriesRequest = APIService.MappableRequest(Category.self, resource: .categories)

let categoriesRequestResult = 
    api.bindArrayRequest(categoriesRequest, 
                         from: actionRequest,
                         trackedBy: indicator,
                         disposedBy: disposeBag)

let newCategories = categoriesRequestResult
    .map  $0.element 
    .filterNil()

let categoriesUpdateData = DatabaseService.UpdateData(newObjectsObservable: newCategories)

let categoriesDatabaseResult =
    database.bindUpdates(from: categoriesUpdateData,
                         trackedBy: indicator,
                         disposedBy: disposeBag)

let latestTopicsRequest = APIService.MappableRequest(Topic.self, resource: .latestTopics)

let latestTopicsRequestResult =
    api.bindArrayRequest(latestTopicsRequest,
                         from: actionRequest,
                         trackedBy: indicator,
                         disposedBy: disposeBag)

let newLastTopics = latestTopicsRequestResult
    .map  $0.element 
    .filterNil()

let latestTopicsUpdateData = DatabaseService.UpdateData(newObjectsObservable: newLastTopics)

let latestTopicsDatabaseResult = 
    database.bindUpdates(from: latestTopicsUpdateData,
                         trackedBy: indicator,
                         disposedBy: disposeBag)

isUpdatingData observable 是我所需要的。

【讨论】:

以上是关于RxSwift 跟踪多个 observables 活动的主要内容,如果未能解决你的问题,请参考以下文章

如何在 RxSwift 中组合多个 Observable

RxSwift 对一个 observable 的多个订阅

RxSwift 从一个创建多个 Observable

RxSwift 系列

使用 RxSwift 的异步任务和多个观察者

RxSwift 系列