合并:发布者有时会失去价值并完成
Posted
技术标签:
【中文标题】合并:发布者有时会失去价值并完成【英文标题】:Combine: Publisher sometimes loses value and completes 【发布时间】:2020-02-14 09:01:41 【问题描述】:我有一个简单的Deferred Publisher
,它从磁盘读取数据并将数据显示在SwiftUI List
中,Publisher
在大多数情况下都能正常工作,但有时 它表现不佳,它只是失去了它的价值(这是一个Model
对象的数组)并以finished
消息完成。我尝试了提到here 的解决方法来使用buffer
运算符将值保留在缓冲区中,因为我相信Combine's Publisher
在设计上不会将数据传递到下游,如果订阅者没有请求并因此丢弃此数据并完成,但是使用 buffer
并没有解决问题。
我的代码:
enum FileError: Error
case someError
class ViewModel: ObservableObject
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData()
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: completion in
print("Completion: \(completion)")
) array in
self.modelArray = array
.store(in: &subscriptions)
struct ContentView: View
@ObservedObject var viewModel: ViewModel
var body: some View
VStack
List(self.viewModel.modelArray) model in
Text("\(model.name)")
.onAppear
self.viewModel.readData()
struct Model: Codable, Identifiable
var id: Int
var name: String
class DataSource
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError>
Deferred
Future promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else
promise(.failure(.someError))
return
promise(.success(modelArray))
.receive(on: self.readQueue)
.eraseToAnyPublisher()
This is a link 下载一个工作示例项目。
编辑:
环境:Xcode 11.3.1、ios 13.3 iPhone 11 Pro Max 模拟器和设备。
gif 截图(注意控制台输出)
EDIT2:
如果我添加任何下游发布者,例如combineLatest
,例如在消费者函数readData()
中的sink
之前,则会引入一个新行为,它将异步发布者(readFromBundle)与同步发布者(combineLatest
)链接起来) 将导致该值根本无法在iOS 13.3+
设备上传递,有时会在iOS 13.3
以下的设备上传递,如this link 所述。
【问题讨论】:
您的链接项目中是否出现错误?我启动了几次,它成功了...... 这不是一个错误,它只是因为发布者完成而没有显示数据,是的,它发生在链接的项目中,但并非总是如此(90% 的尝试都是成功的数据) 是的,这就是我的意思...但我开始时从未发生过... 也许你应该给出“真实”的身份...因为你的身份是双 1...5 你在测试什么 ios 版本?我什至现在尝试使用重复计时器,它每次都在 ios 13.2 上工作 【参考方案1】:看起来像是赛车问题,请尝试以下(仅通过代码阅读)
1) 显式使用后台队列
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background,
attributes: .concurrent)
2) 在此队列上安排发布者而不是在其上接收
.subscribe(on: self.readQueue)
【讨论】:
调度而不是接收成功了,但不明白为什么。现在它表现良好,没有一次尝试在没有值的情况下完成。【参考方案2】:让我们看看关于.receive(on:)
的文档
指定从发布者接收元素的调度程序。 声明
func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler
讨论
您使用receive(on:options:)
运算符接收特定调度程序的结果,例如在主运行循环上执行 UI 工作。与影响上游消息的subscribe(on:options:)
相比,receive(on:options:)
改变了下游消息的执行上下文。在以下示例中,对 jsonPublisher 的请求在 backgroundQueue 上执行,但从它接收的元素在 RunLoop.main 上执行。
let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
jsonPublisher
.subscribe(on: backgroundQueue)
.receiveOn(on: RunLoop.main)
.subscribe(labelUpdater)
参数
调度器 发布者用于元素交付的调度程序。 选项 自定义元素交付的调度程序选项。 返回
使用指定调度程序交付元素的发布者。
你的意思是
import SwiftUI
import Combine
enum FileError: Error
case someError
class ViewModel: ObservableObject
@Published var modelArray = [Model]()
private var subscriptions = Set<AnyCancellable>()
func readData()
DataSource()
.readFromBundle(resource: "Sample", type: "json")
.sink(receiveCompletion: completion in
print("Completion: \(completion)")
) array in
print("received value")
self.modelArray = array
.store(in: &subscriptions)
struct ContentView: View
@ObservedObject var viewModel: ViewModel
var body: some View
VStack
List(self.viewModel.modelArray) model in
Text("\(model.name)")
.onAppear
self.viewModel.readData()
struct Model: Codable, Identifiable
var id: Int
var name: String
class DataSource
private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)
func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError>
Deferred
Future promise in
guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
let data = try? Data(contentsOf: url),
let modelArray = try? JSONDecoder().decode([Model].self, from: data)
else
promise(.failure(.someError))
return
promise(.success(modelArray))
.subscribe(on: readQueue)
.receive(on: RunLoop.main)
.eraseToAnyPublisher()
这解释了为什么 Asperi 的解决方案有效。不同的是,在readData()
中没有必要再次调用 .receive(on:)
DispatchQueue.main
和 RunLoop.main
之间的区别在您的示例中并不显着。
【讨论】:
【参考方案3】:第一次运行不会失败,它只是“需要”时间来加载它......你可以通过添加这个来检查这个。
print("ready")
promise(.success(modelArray))
然后将断点设置为“尚未加载”,您将看到“尚未加载”出现在控制台中打印“就绪”之前。这不是出版商的一滴水。
正如 onAppear() 所说,它将在 UI 显示后被调用....
if self.viewModel.modelArray.count == 0
Text("not loaded yet")
else
List(self.viewModel.modelArray) model in
Text("\(model.name)")
【讨论】:
【参考方案4】:代码中的问题是receive(on:)
中使用的readQueue
是并发的。 value 和 completion 都被分别分派到这个队列中,这样 value 和完成的顺序就不能保证了。如果下游订阅者首先收到完成,它将取消其订阅并忽略该值。使readQueue
串行工作与直接使用另一个串行队列(例如DispatchQueue.main
)一样。
使用subscribe(on:)
而不是receive(on:)
与并发队列结合使用,因为promise
调用会导致值和完成发送一起分派。
【讨论】:
以上是关于合并:发布者有时会失去价值并完成的主要内容,如果未能解决你的问题,请参考以下文章
UIScrollView 有时会在更改内部 UITableView 的高度时失去滚动