从其他 Observable 获取值的 Rx Observable
Posted
技术标签:
【中文标题】从其他 Observable 获取值的 Rx Observable【英文标题】:Rx Observable that gets value from other Observable 【发布时间】:2017-08-02 15:58:57 【问题描述】:我是 RxSwift 和 MVVM 的新手。
我的 viewModel 有一个名为 rx_fetchItems(for:)
的方法,它负责从后端获取相关内容的繁重工作,并返回 Observable<[Item]>
。
我的目标是提供一个名为 collectionItems
的 viewModel 的可观察属性,以及从 rx_fetchItems(for:)
返回的最后一个发射元素,为我的 collectionView 提供数据。
Daniel T 提供了这个我可能会使用的解决方案:
protocol ServerAPI
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
struct ViewModel
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI)
let serverItems = controlValue
.map ItemCategory(rawValue: $0)
.filter $0 != nil .map $0! // or use a `filterNil` operator if you already have one implemented.
.flatMap api.rx_fetchItems(for: $0)
.materialize()
.filter $0.isCompleted == false
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter $0.element != nil .dematerialize()
error = serverItems.filter $0.error != nil .map $0.error!
这里唯一的问题是我当前的 ServerAPI aka FirebaseAPI 没有这样的协议方法,因为它被设计为使用单一方法来触发所有请求,如下所示:
class FirebaseAPI
private let session: URLSession
init()
self.session = URLSession.shared
/// Responsible for Making actual API requests & Handling response
/// Returns an observable object that conforms to JSONable protocol.
/// Entities that confrom to JSONable just means they can be initialized with json.
func rx_fireRequest<Entity: JSONable>(_ endpoint: FirebaseEndpoint, ofType _: Entity.Type ) -> Observable<[Entity]>
return Observable.create [weak self] observer in
self?.session.dataTask(with: endpoint.request, completionHandler: (data, response, error) in
/// Parse response from request.
let parsedResponse = Parser(data: data, response: response, error: error)
.parse()
switch parsedResponse
case .error(let error):
observer.onError(error)
return
case .success(let data):
var entities = [Entity]()
switch endpoint.method
/// Flatten JSON strucuture to retrieve a list of entities.
/// Denoted by 'GETALL' method.
case .GETALL:
/// Key (underscored) is unique identifier for each entity, which is not needed here.
/// value is k/v pairs of entity attributes.
for (_, value) in data
if let value = value as? [String: AnyObject], let entity = Entity(json: value)
entities.append(entity)
// Need to force downcast for generic type inference.
observer.onNext(entities as! [Entity])
observer.onCompleted()
/// All other methods return JSON that can be used to initialize JSONable entities
default:
if let entity = Entity(json: data)
observer.onNext([entity] as! [Entity])
observer.onCompleted()
else
observer.onError(NetworkError.initializationFailure)
).resume()
return Disposables.create()
rx_fireRequest
方法最重要的一点是它接受了一个FirebaseEndpoint
。
/// Conforms to Endpoint protocol in extension, so one of these enum members will be the input for FirebaseAPI's `fireRequest` method.
enum FirebaseEndpoint
case saveUser(data: [String: AnyObject])
case fetchUser(id: String)
case removeUser(id: String)
case saveItem(data: [String: AnyObject])
case fetchItem(id: String)
case fetchItems
case removeItem(id: String)
case saveMessage(data: [String: AnyObject])
case fetchMessages(chatroomId: String)
case removeMessage(id: String)
为了使用 Daniel T 的解决方案,Id 必须将每个枚举案例从 FirebaseEndpoint
转换为 FirebaseAPI
中的方法。在每种方法中,请致电rx_fireRequest
... 如果我是正确的。
如果它有助于更好的服务器 API 设计,我会渴望做出这种改变。所以简单的问题是,这个重构是否会改善我的整体 API 设计以及它与 ViewModel 的交互方式。我意识到这正在演变为代码审查。
还...这是该协议方法及其助手的实现:
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category
case .Local:
let localItems = fetchedItems
.flatMapLatest [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]>
return Observable.create observable in
observable.onNext(items.filter $0.location == "LA" )
observable.onCompleted()
return Disposables.create()
如果我对 MVVM 或 RxSwift 或 API 设计的方法是错误的,请批评。
【问题讨论】:
【参考方案1】:我知道开始理解 RxSwift 很难
我喜欢使用Subject
s 或Variable
s 作为ViewModel 的输入,而Observable
s 或Driver
s 作为ViewModel 的输出
通过这种方式,您可以将 ViewController 上发生的操作绑定到 ViewModel,在那里处理逻辑,并更新输出
这是一个重构代码的示例
查看模型
// Inputs
let didSelectItemCategory: PublishSubject<ItemCategory> = .init()
// Outputs
let items: Observable<[Item]>
init()
let client = FirebaseAPI()
let fetchedItems = client.rx_fireRequest(.fetchItems, ofType: Item.self)
self.items = didSelectItemCategory
.withLatestFrom(fetchedItems, resultSelector: itemCategory, fetchedItems in
switch itemCategory
case .Local:
return fetchedItems.filter $0.location == "Los Angeles"
default: return []
)
视图控制器
segmentedControl.rx.value
.map(ItemCategory.init(rawValue:))
.startWith(.Local)
.bind(to: viewModel.didSelectItemCategory)
.disposed(by: disposeBag)
viewModel.items
.subscribe(onNext: items in
// Do something
)
.disposed(by: disposeBag)
【讨论】:
恕我直言,从长远来看,这不是对主题的适当使用,但它可以帮助您度过学习曲线...davesexton.com/blog/post/… 感谢@DanielT. 的评论,但是如果您不手动触发该主题的事件,您认为这是一个问题吗?在这种情况下,它仅用于将操作绑定到 vm... 在这种特殊情况下,segmentedControl.rx.value
已经是一个 observable,所以你不需要PublishSubject
。只需将 observable 传递到视图模型并直接绑定它,而不是使用主题作为中介。【参考方案2】:
我认为您遇到的问题是,您在可观察范式上只进行了一半,这让您失望。尝试一直使用它,看看是否有帮助。例如:
protocol ServerAPI
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
struct ViewModel
let collectionItems: Observable<[Item]>
let error: Observable<Error>
init(controlValue: Observable<Int>, api: ServerAPI)
let serverItems = controlValue
.map ItemCategory(rawValue: $0)
.filter $0 != nil .map $0! // or use a `filterNil` operator if you already have one implemented.
.flatMap api.rx_fetchItems(for: $0)
.materialize()
.filter $0.isCompleted == false
.shareReplayLatestWhileConnected()
collectionItems = serverItems.filter $0.element != nil .dematerialize()
error = serverItems.filter $0.error != nil .map $0.error!
编辑以处理评论中提到的问题。您现在需要传入具有rx_fetchItems(for:)
方法的对象。您应该拥有多个这样的对象:一个指向服务器,一个不指向任何服务器,而是返回预设数据,以便您可以测试任何可能的响应,包括错误。 (视图模型不应该直接与服务器对话,而应该通过中介来进行...
上面的秘诀是materialize
运算符,它将错误事件包装到包含错误对象的正常事件中。这样您就可以阻止网络错误关闭整个系统。
针对您问题中的更改...您可以简单地使 FirebaseAPI 符合 ServerAPI:
extension FirebaseAPI: ServerAPI
func rx_fetchItems(for category: ItemCategory) -> Observable<[Item]>
// fetched items returns all items in database as Observable<[Item]>
let fetchedItems = self.rx_fireRequest(.fetchItems, ofType: Item.self)
switch category
case .Local:
let localItems = fetchedItems
.flatMapLatest [weak self] (itemList) -> Observable<[Item]> in
return self!.rx_localItems(items: itemList)
return localItems
// TODO: Handle other cases like RecentlyAdded, Trending, etc..
// Helper method to filter items for only local items nearby user.
private func rx_localItems(items: [Item]) -> Observable<[Item]>
return Observable.create observable in
observable.onNext(items.filter $0.location == "LA" )
observable.onCompleted()
return Disposables.create()
此时您可能应该将 ServerAPI
的名称更改为 FetchItemsAPI
之类的名称。
【讨论】:
谢谢丹尼尔,这真的很有帮助。我在编译这个时遇到了麻烦。如果您通过 flatMap 在 rx_fetchItems 方法中输入类别,则会出现错误,显示“闭包无法隐式捕获变异的 self 参数”。我认为这是因为 viewModel 是一个结构。但是如果我将 viewModel 更改为一个类,那么我会收到错误,因为我在所有成员初始化之前在闭包中捕获了 self。所以看来我不能在初始化程序中有这个代码。你对如何解决这个问题有什么建议吗?感谢您的宝贵时间。 也许这段代码可以放在名为 category 的属性的 didSet 中?然后我可以删除 map 和 filter 运算符,并通过在 didSet 中调用 rx_fetchItems 开始?并且属性 collectionItems & error 将是可选的。 我的 API 客户端实际上具有从服务器加载数据列表的代码,并从 json 为列表中的每个项目实例化一个模型。由于 1000 个初始化失败的可能性,我选择不发出错误,所以我仍然可以检索其他 999 个成功的模型。似乎如果我使用物化,我可以继续并发出错误。这实际上是在我的question 中讨论的,你理所当然地投票结束了。 您不应该得到“闭包不能隐式捕获变异的 self 参数”,因为在我写的闭包中没有捕获 self 。将代码放在didSet
中会破坏算法。每次设置值时,您都必须替换 collectionItems
和 error
,这将破坏您的所有订阅。 Rx 是声明性的,您在viewDidLoad
中设置一次,它在视图控制器的生命周期内都有效。不断重置是错误的。
我想我理解编译问题。我会调整代码以适应...【参考方案3】:
你在这里遇到了一个棘手的情况,因为你的 observable 可能会抛出一个错误,一旦它抛出一个错误,observable 序列就会出错,并且不能发出更多的事件。因此,要处理后续的网络请求,您必须重新分配采用当前采用的方法。但是,这通常不适合驱动 UI 元素(例如集合视图),因为您每次都必须绑定到重新分配的 observable。在驱动 UI 元素时,您应该倾向于保证不会出错的类型(即变量和驱动程序)。您可以将Observable<[Item]>
设置为let items = Variable<[Item]>([])
,然后您可以将该变量的值设置为来自新网络请求的项目数组。您可以使用 RxDataSources 或类似的方法安全地将这个变量绑定到您的集合视图。然后,您可以为来自网络请求的错误消息创建一个单独的变量,例如 let errorMessage = Variable<String?>(nil)
,然后您可以将 errorMessage 字符串绑定到标签或类似的东西以显示您的错误消息。
【讨论】:
很好的答案,谢谢!我是否将我的 items 可观察变量分配给正确位置的获取项目的结果(在 didset 中订阅)。在 viewcontroller 中,当我订阅 items 属性时,onNext 块中不会打印任何内容。以上是关于从其他 Observable 获取值的 Rx Observable的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot 2 - 从 RestControler 返回 rx.Observable
如何声明 Rx.Observable.fromPromise() 的 TypeScript 返回类型
RxCocoa-observable 正在从 API 发出新数据,但 tableview.rx.items 不显示新数据