Kotlin Flow无缝衔接Retrofit——FlowCallAdapter
Posted Vincent(朱志强)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin Flow无缝衔接Retrofit——FlowCallAdapter相关的知识,希望对你有一定的参考价值。
*本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布:
https://mp.weixin.qq.com/s/BCmvMHfX8zGnUL5Om_Ez9w
Kotlin
早已成为android
官方开发语言,企图统一开发规范的Jetpack
系列库基本上天然支持Kotlin
。Android世界里,Retrofit
在Kotlin
之前就已名扬天下,默认情况下,Retrofit
的API
接口方法返回值需声明为Call<ResponseBody>
或Call<Response<ResponseBody>>
。
我们可以通过Converter
对ResponseBody
进行自动解析,如GsonConverter
,则方法返回值可声明为Call<Response<[Bean]>>
或Call<[Bean]>
。
我们还可以通过CallAdapter
对Call<T>
进行转化,如RxjavaCallAdapter
,则方法返回值可声明为Observable<Response<[Bean]>>
,Observable<[Bean]>
。
Retrofit + RxjavaCallAdapter + GsonConverter
是最受欢迎的使用模式。Retrofit
早期并不支持Kotlin
,不支持不代表不可用,毕竟Java
和Kotlin
可以互相调用。在Kotlin
中使用Retrofit
会是这个样子:
Kotlin中使用RxjavaCallAdapter
1. 声明ApiService
interface WeatherApiService
@GET("weather/now.json")
fun getWeatherInfoNow(@Query("location") location: String): Single<WeatherInfo>
2. 调用ApiService
ApiServiceManager.weatherApiService
.getWeatherInfoNow(location = "北京")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
println("process biz data:$it") ,
println("error occurs:$it") ,
)
在Kotlin
的世界里,异步代码应该通过协程来执行,协程的性能优势在此就不赘述了。这种代码依旧停留在基于线程切换的异步代码。在Retrofit
正式推出支持Kotlin
的方案前,Jake Warton
大神推出了一个方案,基于Kotlin
Deferred
的CoroutineCallAdapter。
基于Deferred的CoroutineCallAdapter
1. 添加CoroutineCallAdapter
private val retrofit = Retrofit.Builder()
//...
.addCallAdapterFactory(CoroutineCallAdapterFactory())
//...
.build()
2. 声明ApiService,接口方法返回值声明为Deferred<T>
类型
interface WeatherApiService
@GET("weather/now.json")
fun getWeatherInfoNow(@Query("location") location: String): Deferred<WeatherInfo>
3. 调用ApiService
MainScope().launch
try
val weatherInfo = ApiServiceManager.weatherApiService
.getWeatherInfoNow(location = "北京")
.await()
println("process biz data:$weatherInfo")
catch (ex: Exception)
println("error occurs:$ex")
后来,CoroutineCallAdapter废弃了,因为Retrofit
自身有了支持方案,不需要使用者添加CallAdapter
,ApiService
接口方法可直接声明为suspend
方法。
Retrofit内置的kotlin支持方案
1. 为api接口方法添加suspend关键字
interface WeatherApiService
@GET("weather/now.json")
suspend fun getWeatherInfoNow(@Query("location") location: String): WeatherInfo
其实在Retrofit
内部,会自动为suspend
方法创建CallAdapter
,所以添加了suspend
关键字的api
方法,其返回值可声明为以下类型,后面两种需要已添加GsonConverter
。
① ResponseBody
② Response<ResponseBody>
③ [Bean]
④ Response<[Bean]>
2. 调用ApiService
MainScope().launch
try
val weatherInfo = ApiServiceManager.weatherApiService
.getWeatherInfoNow(location = "北京")
println("process biz data:$weatherInfo")
catch (ex: Exception)
println("error occurs:$ex")
尽管存在上述两种方案,很多小伙伴依然倾向于使用RxjavaCallAdapter
,原因如下:
Rxjava
功能强大的操作符,可进行复杂的数据处理- 响应式编程,数据变换、异常捕获,业务处理均流式处理。上述的示例代码还需用
try-catch
块包裹,不如Rxjava
简洁。当然了,上述代码还可以通过CoroutineExceptionHandler
实现异常捕获,但仍不如Rxjava
优雅和丝滑。 - 上述两种方案的内部实现上最终都是调用
Call
的enqueue
方法,异步调用Call
,线程调度上使用Okhttp
内部的线程池,真正耗时的代码片段在Okhttp
的线程池里执行的,而不是由协程调用方的上下文决定。RxjavaCallAdapter
可通过指定async
参数控制是同步调用Call
还是异步调用Call
。如://同步调用Call //Call.execute()的执行线程由subscribeOn(Scheduler)方法决定 //我们大多采用该种方式 RxJava2CallAdapterFactory.create()
但是,前文说过了,//异步调用Call //仅仅Call.enqueue()的调用发生在subscribeOn(Scheduler)所指定的线程 //真正的耗时操作在Okhttp的线程池里执行 RxJava2CallAdapterFactory.createAsync()
Rxjava
并没有天然支持Kotlin
。
FlowCallAdapter
结合以上的讨论,我们究竟需要一个什么样的CallAdapter
?
- 支持
Kotlin
- 类似
Rxjava
的响应式编程且具有强大的数据处理功能(通过操作符实现) - 灵活指定是同步调用Call还是异步调用
Call
其实,Kotlin
中已经存在可与Rxjava
一争高低的东西了,那就是Flow
。我们只需定义一个FlowCallAdapter
,使Api
接口方法的返回值可声明为Flow<T>
就可以了。轮子已造好,先看如何使用,然后讨论实现。
如何使用FlowCallAdapter
-
第一步,添加依赖
implementation "io.github.vincent-series:sharp-retrofit:1.9"
当然,你的项目里还要有
Retrofit
及Kotlin-Coroutine
的相关依赖。 -
为
Retrofit
添加FlowCallAdapter
private val retrofit = Retrofit.Builder() // ... .addCallAdapterFactory(FlowCallAdapterFactory.create()) // ... .build()
可为
FlowCallAdapterFactory.create()
指定async
参数控制是同步调用Call
还是异步调用Call
,默认为false
,即由协程上下文决定网络请求的执行线程。FlowCallAdapterFactory.create(async = false)
-
声明
Api
接口,如interface WeatherApiService @GET("weather/now.json") fun getWeatherInfoNow(@Query("location") location: String): Flow<WeatherInfo>
Api
方法返回值支持声明为以下类型,后面两种需要已添加GsonConverter
。
①Flow<ResponseBody>
②Flow<Response<ResponseBody>>
③Flow<[Bean]>
④Flow<Response<[Bean]>>
-
调用
Api
MainScope().launch ApiServiceManager.weatherApiService .getWeatherInfoNow(location = "北京") //通过一系列操作符处理数据,如map,如果有必要的话 .map // ... //在Dispatcher.IO上下文中产生订阅数据 .flowOn(Dispatchers.IO) //捕获异常 .catch ex -> //处理异常 println("error occurs:$ex") //订阅数据 .collect //处理数据 println("weather info:$it")
注意:
Flow
一开始同Rxjava
一样,有subscribeOn
和observeOn
方法,后来废弃,产生订阅数据的上下文由flowOn
代替,不再提供对应的observeOn
,Flow
认为接收数据的协程上下文应由调用Flow
的协程决定,如要在主线程接收Flow
发射的数据,只需在MainScope
中订阅Flow
即可。
实现FlowCallAdapter
自定义CallAdapter
需实现接口retrofit2.CallAdapter
,看下它的相关方法:
//泛型参数R表示Call<R>对象的泛型参数,默认为Response<ResponseBody>或ResponseBody,
//如果运用了GsonConverter,有可能是Response<[Bean]>或[Bean]
public interface CallAdapter<R, T>
//将响应体ResponseBody解析为何种类型,如Call<User>或Call<Response<User>>的responseType为User
Type responseType();
//将Call<R>转化成T类型的对象,如Rxjava中,将Call<R>转化成Observable<R>
T adapt(Call<R> call);
BodyFlowCallAdapter
BodyFlowCallAdapter
负责将Call<ResponseBody>
、Call<[Bean]>
转化为Flow<ResponseBody>
和Flow<[Bean]>
。
//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class BodyFlowCallAdapter<R>(private val responseBodyType: R) : CallAdapter<R, Flow<R>>
//由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
//所以直接原样返回responseBodyType即可
override fun responseType(): Type = responseBodyType as Type
//直接调用bodyFlow(call)返回Flow<R>对象。
override fun adapt(call: Call<R>): Flow<R> = bodyFlow(call)
bodyFlow(call)
方法的实现:
fun <R> bodyFlow(call: Call<R>): Flow<R> = flow
suspendCancellableCoroutine<R> continuation ->
//协程取消时,调用Call.cancel()取消call
continuation.invokeOnCancellation
call.cancel()
try
//执行call.execute()
val response = call.execute()
if (response.isSuccessful)
//http响应[200..300),恢复执行,并返回响应体
continuation.resume(response.body()!!)
else
//其他http响应,恢复执行,并抛出http异常
continuation.resumeWithException(HttpException(response))
catch (e: Exception)
//捕获的其他异常,恢复执行,并抛出该异常
continuation.resumeWithException(e)
.let responseBody ->
//通过flow发射响应体
emit(responseBody)
ResponseFlowCallAdapter
ResponseFlowCallAdapter
负责将Call<Response<ResponseBody>>
、Call<Response<[Bean]>>
转化为Flow<Response<ResponseBody>>
和Flow<Response<[Bean]>>
。
//R表示response body的类型,默认为okhttp3.ResponseBody,
//有可能被Converter自动解析为其他类型如[Bean]
class ResponseFlowCallAdapter<R>(private val responseBodyType: R) :
CallAdapter<R, Flow<Response<R>>>
//由于我们只是想将Call转为Flow,无意插足ResponseBody的解析
//所以直接原样返回responseBodyType即可
override fun responseType() = responseBodyType as Type
//直接调用responseFlow(call)返回Flow<Response<R>>对象。
override fun adapt(call: Call<R>): Flow<Response<R>> = responseFlow(call)
responseFlow(call)
方法的实现:
fun <T> responseFlow(call: Call<T>): Flow<Response<T>> = flow
suspendCancellableCoroutine<Response<T>> continuation ->
//协程取消时,调用call.cancel()取消call
continuation.invokeOnCancellation
call.cancel()
try
//执行call.execute()
val response = call.execute()
//恢复执行,并返回Response
continuation.resume(response)
catch (e: Exception)
//捕获异常,恢复执行,并返回异常
continuation.resumeWithException(e)
.let response ->
//通过flow发射Response
emit(response)
AsyncBodyFlowCallAdapter && AsyncResponseFlowCallAdapter
AsyncBodyFlowCallAdapter
和 AsyncResponseFlowCallAdapter
是异步版的FlowCallAdapter
,这里的异步指的是在实际调用Call
时,调用的是call.enqueue
方法,以AsyncBodyFlowCallAdapter
为例与同步版做下对比:
fun <R> asyncBodyFlow(call: Call<R>): Flow<R> = flow
try
suspendCancellableCoroutine<R> continuation ->
//协程取消时,调用Call.cancel()取消call
continuation.invokeOnCancellation
call.cancel()
//调用call.enqueue(),在callback里恢复执行并返回结果
call.enqueue(object : Callback<R>
override fun onResponse(call: Call<R>, response: Response<R>)
if (response.isSuccessful)
//http响应[200..300),恢复执行并返回响应体
continuation.resume(response.body()!!)
else
//其他http响应,恢复执行并抛出http异常
continuation.resumeWithException(HttpException(response))
override fun onFailure(call: Call<R>, t: Throwable)
//其他捕获的异常,恢复执行并抛出该异常
continuation.resumeWithException(t)
)
.let responseBody->
//通过flow发射响应体
emit(responseBody)
catch (e: Exception)
suspendCoroutineUninterceptedOrReturn<Nothing> continuation ->
Dispatchers.Default.dispatch(continuation.context)
//特殊case处理,确保抛出异常前挂起,感兴趣可参考https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
continuation.intercepted().resumeWithException(e)
COROUTINE_SUSPENDED
定义FlowCallAdapterFactory
最后我们需要定义一个工厂类来提供FlowCallAdapter
实例,工厂类需继承retrofit2.CallAdapter.Factory
抽象类。
class FlowCallAdapterFactory private constructor(private val async: Boolean) :
CallAdapter.Factory()
//returnType,代表api接口方法的返回值类型,annotations为该接口方法的注解
//根据参数返回该接口方法的CallAdapter
override fun get(
returnType: Type,
annotations: Array<out Annotation>,
retrofit: Retrofit,
): CallAdapter<*, *>?
//如果返回值原始类型不是Flow类型,直接返回null,表示不做处理
if (getRawType(returnType) != Flow::class.java) return null
//强制返回值类型为Flow<R>,而不是Flow
if (returnType !is ParameterizedType)
throw IllegalStateException("the flow type must be parameterized as Flow<Foo>!")
//获取Flow的泛型参数
val flowableType = getParameterUpperBound(0, returnType)
//获取Flow的泛型参数的原始类型
val rawFlowableType = getRawType(flowableType)
return if (rawFlowableType == Response::class.java)
//Flow<T>中的T为retrofit2.Response,但不是泛型Response<R>模式
if (flowableType !is ParameterizedType)
throw IllegalStateException("the response type must be parameterized as Response<Foo>!")
//选取Response的泛型参数作为ResponseBody,创建Flow<Response<R>>模式的FlowCallAdapter
val responseBodyType = getParameterUpperBound(0, flowableType)
createResponseFlowCallAdapter(async, responseBodyType)
else
//直接将Flow的泛型参数作为ResponseBody,创建Flow<R>模式的FlowCallAdapter
createBodyFlowCallAdapter(async, flowableType)
companion object
//获取工厂实例的方法
//async表示是异步调用Call还是同步调用Call,默认false,即同步调用,
//同步调用则由协程上下文决定Call.execute()的执行线程
//若为true,则协程只调用Call.enqueue()方法,网络请求在okhttp的线程池里执行
@JvmStatic
fun create(async: Boolean = false) = FlowCallAdapterFactory(async)
开源库地址
最后奉上开源库地址,欢迎交流,喜欢的话请star支持一下吧!
https://github.com/vincent-series/sharp-retrofit。
以上是关于Kotlin Flow无缝衔接Retrofit——FlowCallAdapter的主要内容,如果未能解决你的问题,请参考以下文章