合并发布者时如何处理错误?

Posted

技术标签:

【中文标题】合并发布者时如何处理错误?【英文标题】:How are errors handled when combining publishers? 【发布时间】:2021-05-03 16:35:04 【问题描述】:

我最近开始使用 Combine,我正在尝试创建一个返回多个数据源组合的存储库。为此,每个数据源都以一种非常简单的加载方法加载自己的数据:

func loadData() -> AnyPublisher<DataObject, Error>

要合并我想使用combineLatest 的数据源,因为它会等待数据源完成加载,然后它会发布包含数据的组合集或指示失败的错误:

func loadData() -> AnyPublisher<[DataObject], Error> 
   return dataSource1.loadData()
             .combineLatest(dataSource2.loadData())
             .map  $0.0 + $0.1 
             .eraseToAnyPublisher()

总的来说,它的行为似乎还不错,我可以调用repository.loadData(),我会得到一个包含这两个项目数据的数组。但是,如果任何数据源发生故障,情况并非如此。在这种情况下,无论其他数据源是否成功,加载方法都会返回错误。

在合并发布者时,是否有标准或推荐的方法来收集所有错误?在我的使用环境中,我希望只有在两个发布者都失败时才能够丢弃错误,但如果只有其中一个发布者失败,则通过并成功。

【问题讨论】:

combineLatest 没有给你一个数组 - 它提供了一个来自第一个数据源的最新值和来自第二个数据源的最新值的元组,所以你的 loadData 示例是错误的 澄清一下,除非两个上游发布者都失败了,否则您想忽略错误,但如果只有一个失败,则继续发出值? 是的,完全正确。我编辑了示例,使其包含一个数组,这只是为了展示错误的问题。 您的编辑仍然无法正常工作,除非您有一个自定义的 + 函数,该函数创建了一个包含两个 DataObject 对象的数组。也许您的意思是 .map [$0.0, $0.1] ?但无论如何,如果失败了,你会发出什么值?或者您是否正在寻找 Merge 行为?这不会给你一个数组 - 它会交错来自每个发布者的值 【参考方案1】:

认为你是说你想要这个:

    如果dataSource1 失败并且dataSource2 产生输出,则丢弃来自dataSource1 的失败并仅传递来自dataSource2 的输出。

    如果dataSource1 产生输出而dataSource2 失败,则仅传递dataSource1 的输出并丢弃dataSource2 的失败。

    如果dataSource1dataSource2 都产生输出,则传递组合输出。

    如果dataSource1dataSource2 都失败,则传递其中一个错误。

我假设每个数据源最多产生一个输出。这是一个测试设置:

typealias DataObject = String

struct DataSource 
    var result: Result<DataObject, Error>

    func loadData() -> AnyPublisher<DataObject, Error> 
        return result.publisher.eraseToAnyPublisher()
    

我们确实想使用combineLatest,但我们不能让combineLatest 的任一输入失败,因为这会使combineLatest 失败。我们只希望combineLatest两个数据源都失败的情况下失败。因此,我们需要一种方法将错误传递给combineLatest,作为其输入发布者之一的输出,而不是作为其输入发布者之一的失败

我们通过将每个输入发布者转换为具有Result&lt;DataObject, Error&gt;OutputFailureNever 来做到这一点。

func combine(
    _ source1: DataSource,
    _ source2: DataSource
) -> AnyPublisher<[DataObject], Error> 
    let ds1 = source1.loadData()
        .map  Result.success($0) 
        .catch  Just(Result.failure($0)) 

    let ds2 = source2.loadData()
        .map  Result.success($0) 
        .catch  Just(Result.failure($0)) 

    let combo = ds1.combineLatest(ds2)
        .tryMap  r1, r2 -> [DataObject] in
            switch (r1, r2) 
            case (.success(let s1), .success(let s2)): return [s1, s2]
            case (.success(let s1), .failure(_)): return [s1]
            case (.failure(_), .success(let s2)): return [s2]
            case (.failure(let f1), .failure(_)): throw f1
            
        

    return combo.eraseToAnyPublisher()

让我们测试一下:

struct MockError: Error  

combine(.init(result: .success("hello")), .init(result: .success("world")))
    .sink(
        receiveCompletion:  print($0) ,
        receiveValue:  print($0) )
// Output:
// ["hello", "world"]
// finished

combine(.init(result: .success("hello")), .init(result: .failure(MockError())))
    .sink(
        receiveCompletion:  print($0) ,
        receiveValue:  print($0) )
// Output:
// ["hello"]
// finished

combine(.init(result: .failure(MockError())), .init(result: .success("world")))
    .sink(
        receiveCompletion:  print($0) ,
        receiveValue:  print($0) )
// Output:
// ["world"]
// finished

combine(.init(result: .failure(MockError())), .init(result: .failure(MockError())))
    .sink(
        receiveCompletion:  print($0) ,
        receiveValue:  print($0) )
// Output:
// failure(__lldb_expr_28.MockError())

【讨论】:

这是一个很棒的解决方案,效果很好,非常感谢!

以上是关于合并发布者时如何处理错误?的主要内容,如果未能解决你的问题,请参考以下文章

离开页面时如何处理jQuery ajax发布错误

使用app.get调用错误端点时如何处理?

在 Sequelize 中使用 .create(...) 方法时如何处理错误

处理包/模块中的错误时如何处理Python异常

使用 Kafka Streams DSL 时如何处理错误和不提交

当 RabbitMQ 交换不存在时如何处理错误(并且消息通过消息传递网关接口发送)