observable.observeOn() 和公共资源的并发问题

Posted

技术标签:

【中文标题】observable.observeOn() 和公共资源的并发问题【英文标题】:Concurrency issues with observable.observeOn() and common resources 【发布时间】:2020-05-27 09:54:04 【问题描述】:

我在函数内部有一个 observable。 该函数发生在某个队列 queueA 中,并且 observable 是通过 observeOn(schedulerB) 订阅的。在 onNext 中,我正在更改一个类变量。

在另一个函数中,我正在从不同的队列中更改同一个类变量。

这里有一些代码来演示我的情况:

class SomeClass 

    var commonResource: [String: String] = [:]
    var queueA = DispatchQueue(label: "A")
    var queueB = DispatchQueue(label: "B")
    var schedulerB = ConcurrentDispatchQueueScheduler(queue: QueueB)       

    func writeToResourceInOnNext() 
        let obs: PublishSubject<String> = OtherClass.GetObservable()
        obs.observeOn(schedulerB)
           .subscribe(onNext:  [weak self] res in
            // this happens on queue B
            self.commonResource["key"] = res
        
    

    func writeToResource() 
        // this happens on queue A
        commonResource["key"] = "otherValue"
    

我的问题是,如果commonResource同时在两个地方修改,会不会有并发问题?

在使用 observeOn 的 observable 中写入/读取 onNext 内的类/全局变量的常见做法是什么?

谢谢大家!

【问题讨论】:

【参考方案1】:

由于您的 SomeClass 无法控制何时调用这些函数或在哪些线程上回答是肯定的,因此由于其被动性质,您在此代码中设置了并发问题。

这里明显的解决方案是调度到writeToResource() 内的队列 B 以避免竞争条件。

另一种选择是使用NSLock(或NSRecursiveLock)并在写入资源之前锁定它,然后再解锁。

最佳实践是:当订阅函数的闭包内部发生副作用时(在这种情况下,写信给commonResource,闭包是唯一发生副作用的地方。这个这意味着取消被动的writeToResource() 函数,而是传入一个由当前调用该函数的任何代码生成的 Observable。

【讨论】:

谢谢!这很有意义。我不确定我是否理解你的最后一段。 某事调用writeToResource() 以响应某些操作。而不是SomeClass 被动地坐在那里等待该函数被调用,它应该主动订阅该事件是什么。在实践中,这可能意味着注入一个表示 SomeClass 订阅的操作的 Observable。这不是可以在 SO 问题的上下文中很好解释的东西。如果可以的话,请加入我们rxslack.herokuapp.com...

以上是关于observable.observeOn() 和公共资源的并发问题的主要内容,如果未能解决你的问题,请参考以下文章

android app beta版和公版可以存在于同一部手机吗?

Layer2跨链和公链三足鼎立,谁将实现区块链的可扩展梦想?

DB9 接口定义

JavaBean

公钥密码学

司法考试复习总论