RxSwift - 来自 Singleton 的不同类型的队列 observables

Posted

技术标签:

【中文标题】RxSwift - 来自 Singleton 的不同类型的队列 observables【英文标题】:RxSwift - Queue observables of different type from Singleton 【发布时间】:2018-05-17 10:55:16 【问题描述】:

我有一个带有一些公共可用功能的单例类(实际上它会进行网络抓取,但在这里进行了简化)。这些函数都返回一个Single<T>,但类型不同。

它可能看起来像这样:

class Singleton 
    static let shared = Singleton()
    private init()  

    func doSomethingInt() -> Single<Int> 
        return Single.just(1)
            .delay(3, scheduler: MainScheduler.instance)
    

    func doSomethingString() -> Single<String> 
        return Single.just("Wow")
            .delay(3, scheduler: MainScheduler.instance)
    

当有人调用Singleton.shared.doSomthingInt() 时,该函数应该放在一个队列中,直到它通过队列才执行。队列中的下一个 observable 不应该在它完成之前开始执行。理想情况下,Singleton 会有一个函数,它会延迟传递给它的每个函数的执行。像这样的:

private func placeInQueue<T: Any>(operation: Single<T>) -> Single<T> 
    // place in some magic shared queue
    return operation

然后我可以将这个函数链接到应该放在队列中的函数的开头,如下所示:

func doSomethingString() -> Single<String> 
    let operation = Single.just("Wow")
        .delay(3, scheduler: MainScheduler.instance)
    return placeInQueue(operation)

我觉得这应该可以通过concat 操作以某种方式实现,但我还没有能够解决它。

有什么线索吗?

【问题讨论】:

【参考方案1】:

我已经创建了这个类,它似乎正在工作:)

也许这是它运行的调度程序的问题。至少,如果我在队列中添加一些操作,每个操作在MainScheduler 上都有 3 秒的延迟,我可以看到一些排队的操作最终在前一个操作之后或多或少 3.5~4 秒后完成已完成。不过对我来说这不是什么大问题:)

class ObservableQueue 
    init()  

    private var queueArray = [(operation: Observable<Void>, id: Double)]()

    /// Adding the `operation` to an internal queue. Starts execution of the `operation` when all previous operations in the queue had sendt an stop event.
    func placeInQueue<T: Any>(_ operation: Single<T>) -> Single<T> 
        let operationId = createId()
        let queuedOperation = currentQueue()
            .flatMap  _ -> Single<T> in
                return operation
            
            .do(
                onNext:  [weak self] _ in self?.removeFromQueue(id: operationId) ,
                onError:  [weak self] _ in self?.removeFromQueue(id: operationId)
            )
        let queueableOperation = operation
            .map  _ in return () 
            .asObservable()
            .catchErrorJustReturn(())
        addToQueue(queueableOperation, id: operationId)
        return queuedOperation
    

    private func createId() -> Double 
        var operationId: Double = Date().timeIntervalSince1970
        while (queueArray.contains  $0.id == operationId ) 
            operationId = Date().timeIntervalSince1970
        
        return operationId
    

    private func currentQueue() -> Single<Void> 
        var queue = queueArray.map $0.operation 
        if queue.isEmpty 
            queue = [Observable.just(())]
        
        return Observable.concat(queue).takeLast(1).asSingle()
    

    private func addToQueue(_ operation: Observable<Void>, id: Double) 
        queueArray.append((operation: operation, id: id))
    

    private func removeFromQueue(id: Double) 
        guard let index = (queueArray.index  $0.id == id ) else  return 
        queueArray.remove(at: index)
    

我是这样使用它的:

private let queue = ObservableQueue()

func doSomethingInt() -> Single<Int> 
    let operation = Single.just(1)
        .delay(3, scheduler: MainScheduler.instance)
    return queue.placeInQueue(operation)

我希望这对某人有帮助:) 请随时发布对此解决方案的疑虑,或者如果您有任何更好的解决方案。

【讨论】:

以上是关于RxSwift - 来自 Singleton 的不同类型的队列 observables的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 RxSwift 在一个地方捕获来自两个请求的错误

RxSwift - 来自可观察到的忽略错误的驱动程序

在 RxSwift 中具有来自 Observable 的最后两个元素的运算符

如何使用 RxSwift 和 Alamofire 库调用来自另一个 API 的响应的 API?

RxSwift throttle() 获取第一个元素

iOS RxSwift - 如何“断开”一个 observable?