使用 RxJava 延迟获取分页对象

Posted

技术标签:

【中文标题】使用 RxJava 延迟获取分页对象【英文标题】:Lazy fetching of paginated objects using RxJava 【发布时间】:2015-04-10 06:58:05 【问题描述】:

我几乎被卖给了 RxJava,它是 Retrofit 的完美伴侣,但我在迁移代码时遇到了一个常见的模式:为了节省带宽,我想从我的webservice 根据需要,而我的 listview(或 recyclerview)正在使用响应式编程滚动。

我之前的代码完美地完成了这项工作,但反应式编程似乎值得一试。

收听 listview/recyclerview 滚动(和其他无聊的东西)不是问题,使用 Retrofit 很容易获得 Observable:

@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);

我只是想不出在反应式编程中使用的模式。

Concat 运算符似乎是一个很好的起点,在某些时候还可以使用 ConnectableObservable 来延迟发射,也许还有 flatMap,但是如何?

编辑:

这是我目前(幼稚)的解决方案:

public interface Paged<T> 
    boolean isLoading();
    void cancel();
    void next(int count);
    void next(int count, Scheduler scheduler);
    Observable<List<T>> asObservable();
    boolean hasCompleted();
    int position();

以及我使用主题的实现:

public abstract class SimplePaged<T> implements Paged<T> 

    final PublishSubject<List<T>> subject = PublishSubject.create();
    private volatile boolean loading;
    private volatile int offset;
    private Subscription subscription;

    @Override
    public boolean isLoading() 
        return loading;
    

    @Override
    public synchronized void cancel() 
        if(subscription != null && !subscription.isUnsubscribed())
            subscription.unsubscribe();

        if(!hasCompleted())
            subject.onCompleted();

        subscription = null;
        loading = false;
    

    @Override
    public void next(int count) 
        next(count, null);
    

    @Override
    public synchronized void next(int count, Scheduler scheduler) 
        if (isLoading())
            throw new IllegalStateException("you can't call next() before onNext()");

        if(hasCompleted())
            throw new IllegalStateException("you can't call next() after onCompleted()");

        loading = true;

        Observable<List<T>> obs = onNextPage(offset, count).single();

        if(scheduler != null)
            obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!

        subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
    

    @Override
    public Observable<List<T>> asObservable() 
        return subject.asObservable();
    

    @Override
    public boolean hasCompleted() 
        return subject.hasCompleted();
    

    @Override
    public int position() 
        return offset;
    

    /* Warning: functions below may be called from another thread */
    protected synchronized void onNext(List<T> items) 
        if (items != null)
            offset += items.size();

        loading = false;

        if (items == null || items.size() == 0)
            subject.onCompleted();
        else
            subject.onNext(items);
    

    protected synchronized void onError(Throwable t) 
        loading = false;
        subject.onError(t);
    

    protected synchronized void onComplete() 
        loading = false;
    

    abstract protected Observable<List<T>> onNextPage(int offset, int count);


【问题讨论】:

我希望有一个 observable 在必须获取新消息时发出事件,即到达页面底部。然后,您可以为这个 observable 订阅一个函数,以获取每个事件的消息并将它们附加到页面。 @nono240 :在阅读 lopar 的答案之前,我最终得到了一些“概念上”类似于你所做的事情(基本上具有加载状态)。顺便说一句:我认为您可以将几个“loading = false”替换为集中的“finallyDo”(reactivex.io/documentation/operators/do.html @nono240 此外:运算符“withLatestFrom”(github.com/ReactiveX/RxJava/releases/tag/v1.0.7) 最近作为实验性发布,在这种情况下可能有用,我稍后会调查(我是 Rx 编程的新手,所以让我们看,也许根本没有意义!) 【参考方案1】:

这是处理反应式分页的几种潜在方法之一。假设我们有一个方法getNextPageTrigger,它返回一个Observable,当滚动监听器(或任何输入)想要加载一个新页面时,它会发出一些事件对象。在现实生活中,它可能会有 debounce 运算符,但除此之外,我们将确保仅在最新页面加载后触发它。

我们还定义了一种方法来从列表中解包消息:

Observable<Message> getPage(final int page) 
  return service.getMessages(page * PAGE_SIZE, PAGE_SIZE)
      .flatMap(messageList -> Observable.from(messageList));

然后我们就可以做实际的抓取逻辑了:

// Start with the first page.
getPage(0)
    // Add on each incremental future page.
    .concatWith(Observable.range(1, Integer.MAX_VALUE)
        // Uses a little trick to get the next page to wait for a signal to load.
        // By ignoring all actual elements emitted and casting, the trigger must
        // complete before the actual page request will be made.
        .concatMap(page -> getNextPageTrigger().limit(1)
            .ignoreElements()
            .cast(Message.class)
            .concatWith(getPage(page)))  // Then subscribe, etc..

这仍然遗漏了一些可能很重要的事情:

1 - 这显然不知道何时停止获取其他页面,这意味着一旦它到达末尾,取决于服务器返回的内容,每次触发滚动时它可能会不断出现错误或空结果。解决此问题的方法取决于您如何向客户端发出没有更多页面要加载的信号。

2 - 如果您需要重试错误,我建议您查看 retryWhen 运算符。否则,常见的网络错误可能会导致页面加载错误传播。

【讨论】:

我最终做了这样的事情(见我的编辑),但我不是 100% 满意。 我只是错过了您的解决方案不允许在内容滚动期间根据需要“按需”获取页面。 实际上,这就是getNextPageTrigger() 的用途。它是您在其中注入的任何可观察到的内容,可能是用户触发的,也可能是滚动侦听器提供的Subject。基本上任何你想要的。 :) 我明白了!感谢您的澄清。你的方法似乎比我的更“被动”,我可能会将它们混合使用。

以上是关于使用 RxJava 延迟获取分页对象的主要内容,如果未能解决你的问题,请参考以下文章

使用 RxJava 处理分页

使用 rxjava 避免 OVER_QUERY_LIMIT

使用ivx实现分页获取数据的经验总结

Quickblox:如何使用分页来使用 Web SDK 获取自定义对象

如何使用 Retrofit 2 和 RxJava 处理分页

Rxjava 2 - 使用领域和改造获取数据