处置(取消)可观察的。 SubscribeOn 和 observeOn 不同的调度器

Posted

技术标签:

【中文标题】处置(取消)可观察的。 SubscribeOn 和 observeOn 不同的调度器【英文标题】:Dispose (cancel) observable. SubscribeOn and observeOn different schedulers 【发布时间】:2018-08-15 10:21:58 【问题描述】:

修改后的问题

我已经修改了我的问题。对于普通情况。

我想在后台线程中使用 RxSwift 生成项目(从磁盘加载、长时间运行的计算等),并在 MainThread 中观察项目。而且我想确保在 dispose(从主线程)之后不会交付任何物品。

根据文档 (https://github.com/ReactiveX/RxSwift/blob/master/Documentation/GettingStarted.md#disposing):

那么这段代码可以在 dispose 调用执行后打印一些东西吗?答案是:视情况而定。

如果调度程序是串行调度程序(例如 MainScheduler)并且在同一个串行调度程序上调用 dispose,则答案是否定的。

否则是。

但是,如果将 subscribeOn 和observerOn 与不同的调度程序一起使用 - 我们不能保证在 dispose 之后不会发出任何内容(手动或通过 dispose bag,没关系)。

我应该如何在后台生成项目(例如图像)并确保处理后不会使用结果?

我在实际项目中做了解决方法,但我想解决这个问题并了解在相同情况下我们应该如何避免它。

在我的测试项目中,我使用了一小段时间——它们完美地展示了问题!

import RxSwift

class TestClass 
    private var disposeBag = DisposeBag()

    private var isCancelled = false

    init(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) 
        assert(Thread.isMainThread)

        load(longRunningTaskDuration: longRunningTaskDuration)

        DispatchQueue.main.asyncAfter(deadline: .now() + cancelAfter)  [weak self] in
            self?.cancel()
        
    

    private func load(longRunningTaskDuration: TimeInterval) 
        assert(Thread.isMainThread)

        // We set task not cancelled
        isCancelled = false

        DataService
            .shared
            .longRunngingTaskEmulation(sleepFor: longRunningTaskDuration)
            // We want long running task to be executed in background thread
            .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: .global()))
            // We want to process result in Main thread
            .observeOn(MainScheduler.instance)
            .subscribe(onSuccess:  [weak self] (result) in
                assert(Thread.isMainThread)

                guard let strongSelf = self else 
                    return
                

                if !strongSelf.isCancelled 
                    print("Should not be called! Task is cancelled!")
                 else 
                    // Do something with result, set image to UIImageView, for instance
                    // But if task was cancelled, this method will set invalid (old) data
                    print(result)
                

                , onError: nil)
            .disposed(by: disposeBag)
    

    // Cancel all tasks. Can be called in PreapreForReuse.
    private func cancel() 
        assert(Thread.isMainThread)

        // For test purposes. After cancel, old task should not make any changes.
        isCancelled = true

        // Cancel all tasks by creating new DisposeBag (and disposing old)
        disposeBag = DisposeBag()
    


class DataService 
    static let shared = DataService()

    private init()  

    func longRunngingTaskEmulation(sleepFor: TimeInterval) -> Single<String> 
        return Single
            .deferred 
                assert(!Thread.isMainThread)

                // Enulate long running task
                Thread.sleep(forTimeInterval: sleepFor)

                // Return dummy result for test purposes.
                return .just("Success")
        
    


class MainClass 
    static let shared = MainClass()

    private init()  

    func main() 

        Timer.scheduledTimer(withTimeInterval: 0.150, repeats: true)  [weak self] (_) in
            assert(Thread.isMainThread)

            let longRunningTaskDuration: TimeInterval = 0.050

            let offset = TimeInterval(arc4random_uniform(20)) / 1000.0
            let cancelAfter = 0.040 + offset

            self?.executeTest(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
        
    

    var items: [TestClass] = []
    func executeTest(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) 
        let item = TestClass(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
        items.append(item)
    

在某处调用 MainClass.shared.main() 以开始。

我们调用方法来加载一些数据,然后我们调用取消(全部来自主线程)。取消后我们有时会收到结果(也在主线程中),但它已经很旧了。

在实际项目中TestClass 是一个UITableViewCell 的子类,并且在prepareForReuse 中调用了cancel 方法。然后单元被重用,新数据被设置到单元。后来我们得到了OLD任务的结果。旧图像设置为单元格!


原始问题(旧):

我想在 ios 中使用 RxSwift 加载图像。我想在后台加载图像,并在主线程中使用它。所以我订阅了后台线程,并观察了主线程。函数将如下所示:

func getImage(path: String) -> Single<UIImage> 
    return Single
        .deferred 
            if let image = UIImage(contentsOfFile: path) 
                return Single.just(image)
             else 
                return Single.error(SimpleError())
            
        
        .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
        .observeOn(MainScheduler.instance)

但是我遇到了取消的问题。因为不同的调度器用于创建项目和调用 dispose(从主线程释放),所以可以在调用 dispose 后引发订阅事件。因此,在我在 UITableViewCell 中使用的情况下,我收到无效(旧)图像。

如果我在观察(主线程)的同一调度程序中创建项目(加载图像),一切正常! 但我想在后台加载图像,我希望在处理后将其取消(在 prepareForReuse 方法或新路径设置方法中)。常见的模板是什么?

编辑:

我创建了一个测试项目,我可以在 dispose 后收到事件时模拟问题。

我有一个可行的简单解决方案。我们应该在同一个调度器中发出项目。所以我们应该捕获调度程序并在那里发出项目(在长时间运行的任务完成后)。

func getImage2(path: String) -> Single<UIImage> 
    return Single
        .create(subscribe:  (single) -> Disposable in
            // We captrure current queue to execute callback in
            // TODO: It can be nil if called from background thread
            let callbackQueue = OperationQueue.current

            // For async calculations
            OperationQueue().addOperation 
                // Perform any long-running task
                let image = UIImage(contentsOfFile: path)

                // Emit item in captured queue
                callbackQueue?.addOperation 
                    if let result = image 
                        single(.success(result))
                     else 
                        single(.error(SimpleError()))
                    
                
            

            return Disposables.create()
        )
        .observeOn(MainScheduler.instance)

但它不是以 Rx 方式。而且我认为这不是最好的解决方案。

也许我应该使用 CurrentThreadScheduler 来发出项目,但我不明白如何。是否有任何使用调度程序生成项目的教程或示例?我没有找到。

【问题讨论】:

我怀疑您的问题出在其他地方。一旦你取消订阅一个 observable(通过调用 dispose 其一次性),即使后台线程 did 完成加载,你也无法获得带有图像的 .next 事件。 在发射元素后立即调用dispose会出现问题。看起来项目已排队到主队列,当我们调用 dispose(在主线程中)时,新项目已经在队列中。所以它是在调用 dispose 之后收到的。 手动调用dispose的用例是什么? 我在 UITableViewCell 的子类中使用 DisposeBag。我在 prepareForReuse 方法和单元格的 set(_:) 方法中创建了新的 DisposeBag。我试过 DisposeBag 和手动 dispose,结果是一样的。 【参考方案1】:

有趣的测试用例。有个小bug,应该是if strongSelf.isCancelled而不是if !strongSelf.isCancelled。除此之外,测试用例显示了问题。

我直观地期望它在发射之前检查是否已经发生了处置,如果它发生在同一个线程上。

我还发现了这个:

只是为了说明这一点,如果您在一个线程上调用 dispose(例如 main),您不会在同一线程上观察到任何元素。这是一个 保证。

请看这里:https://github.com/ReactiveX/RxSwift/issues/38

所以也许这是一个错误。

为了确保我在这里打开了一个问题: https://github.com/ReactiveX/RxSwift/issues/1778

更新

看来这实际上是一个错误。同时,RxSwift 的优秀人员已经确认并幸运地很快修复了它。请参阅上面的问题链接。

测试

该错误已通过提交bac86346087c7e267dd5a620eed90a7849fd54ff 修复。因此,如果您使用的是 CocoaPods,您可以简单地使用以下内容进行测试:

target 'RxSelfContained' do
  use_frameworks!
  pod 'RxAtomic', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
  pod 'RxSwift', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
end

【讨论】:

非常感谢您为解决我的问题所做的工作。我已经用最新版本的 RxSwift (4.4.0) 进行了测试,效果很好!

以上是关于处置(取消)可观察的。 SubscribeOn 和 observeOn 不同的调度器的主要内容,如果未能解决你的问题,请参考以下文章

连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?

如何在特定队列上启动一系列可观察对象?

RxJava 观察调用/订阅线程

使用 takeUntil 和 combineLatest 取消订阅可观察的 rxjs 不起作用

嵌套可观察订阅问题,无法取消订阅

如何“取消处置”动画控制器以供重用?