从其他 Observable 获取值的 Rx Observable

Posted

技术标签:

【中文标题】从其他 Observable 获取值的 Rx Observable【英文标题】:Rx Observable that gets value from other Observable 【发布时间】:2017-08-02 15:58:57 【问题描述】:

我是 RxSwift 和 MVVM 的新手。

我的 viewModel 有一个名为 rx_fetchItems(for:) 的方法,它负责从后端获取相关内容的繁重工作,并返回 Observable<[Item]>

我的目标是提供一个名为 collectionItems 的 viewModel 的可观察属性,以及从 rx_fetchItems(for:) 返回的最后一个发射元素,为我的 collectionView 提供数据。

Daniel T 提供了这个我可能会使用的解决方案:

protocol ServerAPI 
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>


    struct ViewModel 

        let collectionItems: Observable<[Item]>
        let error: Observable<Error>

        init(controlValue: Observable<Int>, api: ServerAPI) 
            let serverItems = controlValue
                .map  ItemCategory(rawValue: $0) 
                .filter  $0 != nil .map  $0!  // or use a `filterNil` operator if you already have one implemented.
                .flatMap  api.rx_fetchItems(for: $0)
                    .materialize()
                
                .filter  $0.isCompleted == false 
                .shareReplayLatestWhileConnected()

            collectionItems = serverItems.filter  $0.element != nil .dematerialize()
            error = serverItems.filter  $0.error != nil .map  $0.error! 
        

    

这里唯一的问题是我当前的 ServerAPI aka FirebaseAPI 没有这样的协议方法,因为它被设计为使用单一方法来触发所有请求,如下所示:

class FirebaseAPI 

    private let session: URLSession

    init() 
        self.session = URLSession.shared
    

    /// Responsible for Making actual API requests & Handling response
    /// Returns an observable object that conforms to JSONable protocol.
    /// Entities that confrom to JSONable just means they can be initialized with json.
    func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]> 

        return Observable.create  [weak self] observer in
            self?.session.dataTask(with: endpoint.request, completionHandler:  (data, response, error) in

                /// Parse response from request.
                let parsedResponse = Parser(data: data, response: response, error: error)
                    .parse()

                switch parsedResponse 

                case .error(let error):
                    observer.onError(error)
                    return

                case .success(let data):

                    var entities = [Entity]()

                    switch endpoint.method 

                    /// Flatten JSON strucuture to retrieve a list of entities.
                    /// Denoted by 'GETALL' method.
                    case .GETALL:

                        /// Key (underscored) is unique identifier for each entity, which is not needed here.
                        /// value is k/v pairs of entity attributes.
                        for (_, value) in data 
                            if let value = value as? [String: AnyObject], let entity = Entity(json: value) 
                                entities.append(entity)
                            
                        

                        // Need to force downcast for generic type inference.
                        observer.onNext(entities as! [Entity])
                        observer.onCompleted()

                    /// All other methods return JSON that can be used to initialize JSONable entities 
                    default:
                        if let entity = Entity(json: data) 
                        observer.onNext([entity] as! [Entity])
                        observer.onCompleted()
                     else 
                        observer.onError(NetworkError.initializationFailure)
                        
                    
                
            ).resume()
            return Disposables.create()
        
    

rx_fireRequest 方法最重要的一点是它接受了一个FirebaseEndpoint

/// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.

enum FirebaseEndpoint 

    case saveUser(data: [String: AnyObject])
    case fetchUser(id: String)
    case removeUser(id: String)

    case saveItem(data: [String: AnyObject])
    case fetchItem(id: String)
    case fetchItems
    case removeItem(id: String)

    case saveMessage(data: [String: AnyObject])
    case fetchMessages(chatroomId: String)
    case removeMessage(id: String)


为了使用 Daniel T 的解决方案,Id 必须将每个枚举案例从 FirebaseEndpoint 转换为 FirebaseAPI 中的方法。在每种方法中,请致电rx_fireRequest... 如果我是正确的。

如果它有助于更​​好的服务器 API 设计,我会渴望做出这种改变。所以简单的问题是,这个重构是否会改善我的整体 API 设计以及它与 ViewModel 的交互方式。我意识到这正在演变为代码审查。

还...这是该协议方法及其助手的实现:

 func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  
        // fetched items returns all items in database as Observable<[Item]>
        let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
        switch category 
        case .Local:
            let localItems = fetchedItems
            .flatMapLatest  [weak self] (itemList) -> Observable<[Item]> in
                return self!.rx_localItems(items: itemList)
            

            return localItems

            // TODO: Handle other cases like RecentlyAdded, Trending, etc..
        
    

    // Helper method to filter items for only local items nearby user.
    private func rx_localItems(items: [Item]) -> Observable<[Item]> 
        return Observable.create  observable in
            observable.onNext(items.filter  $0.location == "LA" )
            observable.onCompleted()
            return Disposables.create()
        
    

如果我对 MVVM 或 RxSwift 或 API 设计的方法是错误的,请批评。

【问题讨论】:

【参考方案1】:

我知道开始理解 RxSwift 很难

我喜欢使用Subjects 或Variables 作为ViewModel 的输入,而Observables 或Drivers 作为ViewModel 的输出

通过这种方式,您可以将 ViewController 上发生的操作绑定到 ViewModel,在那里处理逻辑,并更新输出

这是一个重构代码的示例

查看模型

// Inputs
let didSelectItemCategory: PublishSubject<ItemCategory> = .init()

// Outputs
let items: Observable<[Item]>

init() 
    let client = FirebaseAPI()

    let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)

    self.items = didSelectItemCategory
        .withLatestFrom(fetchedItems, resultSelector:  itemCategory, fetchedItems in
            switch itemCategory 
            case .Local:
                return fetchedItems.filter  $0.location == "Los Angeles" 
            default: return []
            
        )

视图控制器

segmentedControl.rx.value
    .map(ItemCategory.init(rawValue:))
    .startWith(.Local)
    .bind(to: viewModel.didSelectItemCategory)
    .disposed(by: disposeBag)

viewModel.items
    .subscribe(onNext:  items in
        // Do something
    )
    .disposed(by: disposeBag)

【讨论】:

恕我直言,从长远来看,这不是对主题的适当使用,但它可以帮助您度过学习曲线...davesexton.com/blog/post/… 感谢@DanielT. 的评论,但是如果您不手动触发该主题的事件,您认为这是一个问题吗?在这种情况下,它仅用于将操作绑定到 vm... 在这种特殊情况下,segmentedControl.rx.value 已经是一个 observable,所以你不需要PublishSubject。只需将 observable 传递到视图模型并直接绑定它,而不是使用主题作为中介。【参考方案2】:

我认为您遇到的问题是,您在可观察范式上只进行了一半,这让您失望。尝试一直使用它,看看是否有帮助。例如:

protocol ServerAPI 
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>


struct ViewModel 

    let collectionItems: Observable<[Item]>
    let error: Observable<Error>

    init(controlValue: Observable<Int>, api: ServerAPI) 
        let serverItems = controlValue
            .map  ItemCategory(rawValue: $0) 
            .filter  $0 != nil .map  $0!  // or use a `filterNil` operator if you already have one implemented.
            .flatMap  api.rx_fetchItems(for: $0)
                .materialize()
            
            .filter  $0.isCompleted == false 
            .shareReplayLatestWhileConnected()

        collectionItems = serverItems.filter  $0.element != nil .dematerialize()
        error = serverItems.filter  $0.error != nil .map  $0.error! 
    

编辑以处理评论中提到的问题。您现在需要传入具有rx_fetchItems(for:) 方法的对象。您应该拥有多个这样的对象:一个指向服务器,一个不指向任何服务器,而是返回预设数据,以便您可以测试任何可能的响应,包括错误。 (视图模型不应该直接与服务器对话,而应该通过中介来进行...

上面的秘诀是materialize 运算符,它将错误事件包装到包含错误对象的正常事件中。这样您就可以阻止网络错误关闭整个系统。


针对您问题中的更改...您可以简单地使 FirebaseAPI 符合 ServerAPI:

extension FirebaseAPI: ServerAPI 
    func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>  
        // fetched items returns all items in database as Observable<[Item]>
        let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
        switch category 
        case .Local:
            let localItems = fetchedItems
                .flatMapLatest  [weak self] (itemList) -> Observable<[Item]> in
                    return self!.rx_localItems(items: itemList)
            

            return localItems

            // TODO: Handle other cases like RecentlyAdded, Trending, etc..
        
    

    // Helper method to filter items for only local items nearby user.
    private func rx_localItems(items: [Item]) -> Observable<[Item]> 
        return Observable.create  observable in
            observable.onNext(items.filter  $0.location == "LA" )
            observable.onCompleted()
            return Disposables.create()
        
    

此时您可能应该将 ServerAPI 的名称更改为 FetchItemsAPI 之类的名称。

【讨论】:

谢谢丹尼尔,这真的很有帮助。我在编译这个时遇到了麻烦。如果您通过 flatMap 在 rx_fetchItems 方法中输入类别,则会出现错误,显示“闭包无法隐式捕获变异的 self 参数”。我认为这是因为 viewModel 是一个结构。但是如果我将 viewModel 更改为一个类,那么我会收到错误,因为我在所有成员初始化之前在闭包中捕获了 self。所以看来我不能在初始化程序中有这个代码。你对如何解决这个问题有什么建议吗?感谢您的宝贵时间。 也许这段代码可以放在名为 category 的属性的 didSet 中?然后我可以删除 map 和 filter 运算符,并通过在 didSet 中调用 rx_fetchItems 开始?并且属性 collectionItems & error 将是可选的。 我的 API 客户端实际上具有从服务器加载数据列表的代码,并从 json 为列表中的每个项目实例化一个模型。由于 1000 个初始化失败的可能性,我选择不发出错误,所以我仍然可以检索其他 999 个成功的模型。似乎如果我使用物化,我可以继续并发出错误。这实际上是在我的question 中讨论的,你理所当然地投票结束了。 您不应该得到“闭包不能隐式捕获变异的 self 参数”,因为在我写的闭包中没有捕获 self 。将代码放在didSet 中会破坏算法。每次设置值时,您都必须替换 collectionItemserror,这将破坏您的所有订阅。 Rx 是声明性的,您在viewDidLoad 中设置一次,它在视图控制器的生命周期内都有效。不断重置是错误的。 我想我理解编译问题。我会调整代码以适应...【参考方案3】:

你在这里遇到了一个棘手的情况,因为你的 observable 可能会抛出一个错误,一旦它抛出一个错误,observable 序列就会出错,并且不能发出更多的事件。因此,要处理后续的网络请求,您必须重新分配采用当前采用的方法。但是,这通常不适合驱动 UI 元素(例如集合视图),因为您每次都必须绑定到重新分配的 observable。在驱动 UI 元素时,您应该倾向于保证不会出错的类型(即变量和驱动程序)。您可以将Observable&lt;[Item]&gt; 设置为let items = Variable&lt;[Item]&gt;([]),然后您可以将该变量的值设置为来自新网络请求的项目数组。您可以使用 RxDataSources 或类似的方法安全地将这个变量绑定到您的集合视图。然后,您可以为来自网络请求的错误消息创建一个单独的变量,例如 let errorMessage = Variable&lt;String?&gt;(nil),然后您可以将 errorMessage 字符串绑定到标签或类似的东西以显示您的错误消息。

【讨论】:

很好的答案,谢谢!我是否将我的 items 可观察变量分配给正确位置的获取项目的结果(在 didset 中订阅)。在 viewcontroller 中,当我订阅 items 属性时,onNext 块中不会打印任何内容。

以上是关于从其他 Observable 获取值的 Rx Observable的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 - 从 RestControler 返回 rx.Observable

如何声明 Rx.Observable.fromPromise() 的 TypeScript 返回类型

RxCocoa-observable 正在从 API 发出新数据,但 tableview.rx.items 不显示新数据

入门Rx-Observable的创建方式

Rx 操作符七

Rx Java:Observable 在订阅者请求之前发出项目