Combine 的 receive(on:) 没有分派到串行队列,导致数据竞争

Posted

技术标签:

【中文标题】Combine 的 receive(on:) 没有分派到串行队列,导致数据竞争【英文标题】:Combine's receive(on:) not dispatching to serial queue, causing data race 【发布时间】:2021-07-11 16:22:03 【问题描述】:

根据 Apple 的说法,receive(on:options:) 在给定队列上运行回调。我们使用串行调度队列来防止在下面的代码中对localOptionalCancellable 进行竞速。但是receiveCancel 没有被分派到该队列。谁能告诉我为什么?

从文档中,

您使用 receive(on:options:) 运算符接收特定调度程序的结果和完成,例如在主运行循环上执行 UI 工作。

...

在订阅者中执行工作时,首选接收(on:options:) 而不是显式使用调度队列。例如,而不是以下模式:

问题再现:

import Foundation
import Combine

class Example 
    private var localOptionalCancellable: AnyCancellable?
    private let dispatchQueue = DispatchQueue(label: "LocalQueue-\(UUID())")
    
    func misbehavingFunction() 
        self.dispatchQueue.async 
            self.localOptionalCancellable = Just(())
                .setFailureType(to: Error.self)
                .receive(on: self.dispatchQueue)
                .handleEvents(
                    receiveCancel: 
                        // Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
                        // Can be fixed by wrapping in self.dispatchQueue.async 
                        self.localOptionalCancellable = nil
                    
                )
                .sink(
                    receiveCompletion:  _ in ,
                    receiveValue:  _ in
                        self.localOptionalCancellable = nil
                    
                )
        
    


Example().misbehavingFunction()

堆栈跟踪:

Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
Previous access (a modification) started at  (0x10eeaf12a).
Current access (a modification) started at:
0    libswiftCore.dylib                 0x00007fff2ff7be50 swift_beginAccess + 568
3    Combine                            0x00007fff4ba73a40 Publishers.HandleEvents.Inner.cancel() + 71
4    Combine                            0x00007fff4ba74230 protocol witness for Cancellable.cancel() in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
5    Combine                            0x00007fff4b9f10c0 Subscribers.Sink.cancel() + 652
6    Combine                            0x00007fff4b9f1500 protocol witness for Cancellable.cancel() in conformance Subscribers.Sink<A, B> + 16
7    Combine                            0x00007fff4b9dd2d0 AnyCancellable.cancel() + 339
8    Combine                            0x00007fff4b9dd5f0 AnyCancellable.__deallocating_deinit + 9
9    libswiftCore.dylib                 0x00007fff2ff7da20 _swift_release_dealloc + 16
13   Combine                            0x00007fff4b9f0da0 Subscribers.Sink.receive(_:) + 54
14   Combine                            0x00007fff4b9f14c0 protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> + 16
15   Combine                            0x00007fff4ba73ed0 Publishers.HandleEvents.Inner.receive(_:) + 129
16   Combine                            0x00007fff4ba74170 protocol witness for Subscriber.receive(_:) in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
17   Combine                            0x00007fff4ba26440 closure #1 in Publishers.ReceiveOn.Inner.receive(_:) + 167
18   libswiftDispatch.dylib             0x000000010e97cad0 thunk for @escaping @callee_guaranteed () -> () + 14
19   libdispatch.dylib                  0x00007fff20105323 _dispatch_call_block_and_release + 12
20   libdispatch.dylib                  0x00007fff20106500 _dispatch_client_callout + 8
21   libdispatch.dylib                  0x00007fff2010c12e _dispatch_lane_serial_drain + 715
22   libdispatch.dylib                  0x00007fff2010cde1 _dispatch_lane_invoke + 403
23   libdispatch.dylib                  0x00007fff20117269 _dispatch_workloop_worker_thread + 782
24   libsystem_pthread.dylib            0x00007fff6116391b _pthread_wqthread + 290
25   libsystem_pthread.dylib            0x00007fff61162b68 start_wqthread + 15
Fatal access conflict detected.

【问题讨论】:

这是一种我从未见过的非常复杂的使用 Combine 的方式。你为什么在回调中更新localOptionalCancellable?你想达到什么目的? localOptionalCancellable 用于跟踪当前正在执行的调用并将它们限制为仅 1 个。 看到你的回答,我不确定这会有什么帮助。但按照目前的代码,我们并没有得到 Apple 记录的预期结果。 您能后退十步,解释一下您真正想要做什么吗? 接收到的队列和运行代码的队列是有区别的。 【参考方案1】:

根据 Apple 的说法,receive(on:options:) 在给定队列上运行回调。

不完全是。以下是文档的实际内容:

您使用receive(on:options:) 运算符接收特定调度程序的结果完成,例如在主运行循环上执行 UI 工作。与影响上游消息的subscribe(on:options:) 相比,receive(on:options:) 改变了下游消息的执行上下文。

(添加了重点。)所以receive(on:) 控制用于调用Subscriberreceive(_:)receive(completion:) 方法的Scheduler。它控制用于调用Subscriptionrequest(_:)cancel() 方法的Scheduler

要控制用于调用Subscriptioncancel()方法的Scheduler,需要在handleEvents操作符的下游使用subscribe(on:options:)操作符,像这样:

            self.localOptionalCancellable = Just(())
                .setFailureType(to: Error.self)
                .receive(on: self.dispatchQueue)
                .handleEvents(
                    receiveCancel: 
                        // Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
                        // Can be fixed by wrapping in self.dispatchQueue.async 
                        self.localOptionalCancellable = nil
                    
                )
                .subscribe(on: self.dispatchQueue)
             // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                .sink(
                    receiveCompletion:  _ in ,
                    receiveValue:  _ in
                        self.localOptionalCancellable = nil
                    
                )

【讨论】:

谢谢!但是你是怎么弄清楚这些东西的顺序的呢?我尝试将subscribe 放在handleEvents 之前,但由于同样的原因仍然会导致崩溃。 您可以将输出和完成视为从初始发布者 (Just) 向下传递到订阅者 (sink),将取消视为从订阅者向上传递到发布者。如果取消在到达handleEvents 之前没有通过subscribe(on:),那么subscribe(on:) 不会对handleEvents 的调用方式产生任何影响。 我尝试在我的实际代码中使用它。但有趣的是,如果我从另一个函数将sink 返回的AnyCancellable 设置为nil。或者使用传递给subscribe()Subscribers.Sink&lt;&gt; 对象并从另一个函数调用它的cancel()receiveCancel 回调不会运行。如果链中没有subscribe(on:options:),它就会运行。 完成可能已经通过handleEvents沿链向下传播,导致cancellable无效。 我怎样才能避免这种情况发生?

以上是关于Combine 的 receive(on:) 没有分派到串行队列,导致数据竞争的主要内容,如果未能解决你的问题,请参考以下文章

在 Combine 的 `assign(to:on:)` 主题中分配给 @State 不会导致视图更新

Receive a message from AAA of cutting user on华为设备

Swift Combine:没有“distinct”运算符?

DatagramSocket receive() 没有接收到任何数据

swiftui combine 无法获取数据 [关闭]

avcodec_receive_packet()没有看到输出