为啥 RxJava 的“订阅”方法会被多次调用?

Posted

技术标签:

【中文标题】为啥 RxJava 的“订阅”方法会被多次调用?【英文标题】:Why RxJava's "subscribe" method called multiple times?为什么 RxJava 的“订阅”方法会被多次调用? 【发布时间】:2018-12-24 19:09:46 【问题描述】:

我正在使用 RxJava 开发一个 android 应用程序。 例如,我想从本地数据库中获取一些用户数据。 但如果本地数据库没有用户,我应该使用 REST API 获取用户。

class Presenter 
    val mDisposable = CompositeDisposable()

    override fun getUsersFromLocal() 
        Log.d("TAG", "This is called just one")
        mDisposable.add(localDatabase.userDao().getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe  // This "subscribe" is the problem. Here is called multiple......
                Log.d("TAG", "This subscribe is called multiple, It called more than 10 times")
                if (it.isEmpty()) 
                    secondCall()
                 else 
                    view.onUsersLoaded(it)
                
            )
    

    override fun getUsersFromRemote() 
        mDisposable.add(restApi.getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe 
                saveLocal(it) // If I remove just this line, subscribe is called once. (works fine)
                view.onUsersLoaded(it)
            )
    

我正在使用嵌套的(?) RxJava。 不知道为什么firstCall的subscribe方法被调用了多个... 如果我删除“secondCall()”方法和逻辑,“subscribe”只会调用一次。


我找到了一条线索。 “saveLocal”方法在后台线程中运行。

    private fun saveLocal(users: List<User>) 
        users.forEach 
            appExecutors.diskIo.execute 
                localDatabase.userDao().saveUser(it)
            
        
    

我将上源代码更改为下。

    private fun saveLocal(users: List<User>) 
        appExecutors.diskIo.execute 
            localDatabase.userDao().saveUsers(users)
        
    

更改后,“订阅”只被调用了两次。 是的,它仍然有问题。 但在此之前,“订阅”被多次调用。 (我认为它被称为“用户”大小。)


我也添加了“UserDao 类”和“RestApi 类”代码。

import android.arch.persistence.room.*
import io.reactivex.Flowable
import io.reactivex.Single

@Dao
interface UserDao 

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Flowable<List<User>>


这里是“RestApiService 类”。

import io.reactivex.Observable

interface RestApiService 
    @Headers(API_HEADER_AUTHORIZATION, API_HEADER_ACCEPT)
    @GET("/users")
    fun getUsers(): Observable<UserList> // UserList class has a list of the user

【问题讨论】:

提供的代码并不意味着您会收到两次firstCall 的调用,因此您的原始代码可能存在错误。您调用了两次firstCall,或者您在secondCall.subscribe 调用中出现了复制粘贴错误,该调用调用了firstCall 您的代码运行良好。查找所有 firstCall 用法,您可能在代码中某处调用了 firstCall 两次。 【参考方案1】:

对于您的案例,您需要使用“Maybe”而不是“Flowable”。

使用 Flowable,如果您再次添加新数据或更新之前插入的对象,Flowable 对象将自动发出并再次调用 subscribe 方法:

此更改可能会修复您的代码:

@Dao
interface UserDao 

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Maybe<List<User>>


有关 Flowable、Single 和 Maybe 的更多信息,请参阅下面的文章” Room implementation with RxJava

【讨论】:

【参考方案2】:

您应该使用 switchIfEmpty 运算符,而不是在订阅中使用 if 语句。基本上你做了两个订阅

【讨论】:

虽然这是一个很好的建议,但这并不是问题的解决方案

以上是关于为啥 RxJava 的“订阅”方法会被多次调用?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 ContentObserver 会被多次调用?

Rxjava,改造和多次调用

RxJava 观察调用/订阅线程

在 RxJava2(Android) 中订阅 Vs 订阅?

RxJava2.1.0:在不同线程上订阅时未调用 PublishSubject onNext

使用 rxJava 和改造多次调用另一个请求中的请求