RxJava switchMap 不会取消之前的请求

Posted

技术标签:

【中文标题】RxJava switchMap 不会取消之前的请求【英文标题】:RxJava switchMap doesn't cancel the previous Request 【发布时间】:2019-05-25 05:33:31 【问题描述】:

我正在使用 Kotlin、OkHttp、RxJava2 开发一个 android 应用。

我正在尝试进行自动搜索。

当用户输入一个字符时,搜索它。

override fun search(subject: PublishSubject<String>) 
    disposable.add(
            subject.debounce(1000, TimeUnit.MILLISECONDS)
                    .filter  it.isNotEmpty() 
                    .distinctUntilChanged()
                    .switchMap  keyword ->
                        search(keyword)
                    
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe( response ->
                        // using the response
                    , 
                        it.printStackTrace()
                    )
    )

还有搜索功能:

    fun search(keyword: String): Observable<String> 
        val request = Request.Builder()
            .url(url)
            .post(RequestBody.create(contentType, xmlBody(keyword)))
            .build()

        return Observable.fromCallable 
            val response = OkHttpClient().newCall(request).execute()
            response.body()?.string()
        
    

我喜欢“改造”,但就我而言,我应该使用 XML。 我尝试将 Retrofit 与 SimpleXMLConverter 一起使用,但它不适合这个项目。 所以我正在使用“OkHttp”。 但是上面的代码抛出了“InterruptedIOException”。

第一次通话没问题。 但是在处理第一个请求时收到下一个搜索请求时会发生异常。

我应该如何解决这个问题?

我认为,switchMap应该取消之前的Request,但它没有。

【问题讨论】:

search() 正在阻塞switchMap 工作的线程,因此无法取消。将subscribeOn 直接应用于该fromCallable 当你调用OkHttpClient().newCall(request)它返回Call实例,它有特殊的方法cancel。这样的话,最好在 onDispose 发生后使用Observable.create() 并调用Call#cancel() 您找到解决方案了吗? 【参考方案1】:

你把调度器设置错了。

你需要:

override fun search(subject: PublishSubject<String>) 
disposable.add(
        subject.debounce(1000, TimeUnit.MILLISECONDS)
                .filter  it.isNotEmpty() 
                .distinctUntilChanged()
                .switchMap  keyword ->
                    search(keyword).subscribeOn(Schedulers.io())
                

                .observeOn(AndroidSchedulers.mainThread())
                .subscribe( response ->
                    // using the response
                , 
                    it.printStackTrace()
                )
)

注意后台调度器需要直接设置在search Completable中,而不是switchmap之后的链上。 这是在主链以外的一个链中执行搜索,否则整个链都会被阻塞,swichmap 似乎没有任何作用,因为它只有在自然完成后才会取消搜索。

【讨论】:

以上是关于RxJava switchMap 不会取消之前的请求的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava2的操作符FilterDebounce DistinctUntilChanged SwitchMap 的使用

RxJava Subscription 自动取消订阅

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

了解rxjs中的SwitchMap

使用rxjava时如何取消慢发射

RxJava2 Observable中的异步任务状态&&取消等效项?