源码解读OkHttp内部剖析探究
Posted 丶笑看退场
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码解读OkHttp内部剖析探究相关的知识,希望对你有一定的参考价值。
OkHttp内部关键在于拦截器的处理来实现,把网络请求封装到各个拦截器来实现,实现了各层的解耦。
我们首先发起一个请求:
//创建okHttpClient对象
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(6, TimeUnit.SECONDS)
.readTimeout(6, TimeUnit.SECONDS)
.build();
//创建一个Request
Request request = new Request.Builder().url(strUrl).build();
//发起同步请求
try {
Response response = client.newCall(request).execute();
return response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
内部请求流程
最终实现的代码是OkHttp会调用newCall,返回一个RealCall对象,并调用execute同步方法。其中RealCall
是管理网络请求的类。
## RealCall
override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
//开始计时,发送
transmitter.timeoutEnter()
transmitter.callStart()
try {
//适配器请求
client.dispatcher.executed(this)
//调用拦截器处理
return getResponseWithInterceptorChain()
} finally {
//适配器结束
client.dispatcher.finished(this)
}
}
知道了这个三个方法后。我们一个个来看~
先看这里调用了dispatcher
的excuted
方法,那Dispatcher
在里面起到什么作用?可以看到有三个数组队列进行处理
### Dispatcher
//准备异步调用队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//运行异步调用队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//运行同步调用队列
private val runningSyncCalls = ArrayDeque<RealCall>()
在我们上一步过程中,调用了
### Dispatcher
@Synchronized internal fun executed(call: RealCall) {
//放进正在运行的队列
runningSyncCalls.add(call)
}
//Dispath 核心逻辑
//将符合条件的调用从 [readyAsyncCalls] 提升到 [runningAsyncCalls] 并在执行程序服务上运行它们。不得与同步调用,因为执行调用可以调用用户代码。 @return true 如果调度程序当前正在运行调用。
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // 大于最大请求request数量
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // 大于最大容量
//当一个取出来后,从readyAsyncCalls中删除
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
//移入到runningAsyncCalls中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
//将executableCalls可执行的调用提交到任务线程池
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
这个方法promoteAndExecute
会在入队列和上一个任务执行结束后调用。我们再来看一下里面的executorService
线程池的用法。
### Dispatcher
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//用到了一个CachedThreadPool线程池,快速处理大量耗时较短的任务
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
SynchronousQueue
同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue。
分析好后,再看第二个方法getResponseWithInterceptorChain()`
### RealCall
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// 创建拦截器数组
val interceptors = mutableListOf<Interceptor>()
//添加自定义拦截器
interceptors += client.interceptors】
//添加重试和重定向拦截器
interceptors += RetryAndFollowUpInterceptor(client)
//添加桥拦截器
interceptors += BridgeInterceptor(client.cookieJar)
//添加缓存拦截器
interceptors += CacheInterceptor(client.cache)
//添加连接池拦截器
interceptors += ConnectInterceptor
//添加网络拦截器
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
//添加网络请求拦截器
interceptors += CallServerInterceptor(forWebSocket)
//创建拦截器链,所有拦截器的最终调用者
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
//启动拦截器链
val response = chain.proceed(originalRequest)
.....
return response
}
......
}
可以看到addInterceptor(interceptor)
所设置的拦截器会在所有其他Intercept
处理之前运行。后面就是默认的五个拦截器。
拦截器 | 作用 |
---|---|
应用拦截器 | 处理header头信息, |
RetryAndFollowUpInterceptor | 负责出错重试,重定向 |
BridgeInterceptor | 填充http请求协议中的head头信息 |
CacheInterceptor | 缓存拦截器,如果命中缓存则不会发起网络请求。 |
ConnectInterceptor | 连接池拦截器,Okhttp中的核心 |
networkInterceptors | 自定义网络拦截器,用于监控网络传输数据 |
CallServerInterceptor | 负责和网络的收发 |
拦截器待会分析,把流程先走完。最终会执行chain.proceed(originalRequest)
,看下内部实现
### RealInterceptorChain
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
//当前拦截器调用proceed的次数
calls++
//exchange 传输单个 HTTP 请求和响应对
// 确认传入的请求正在调用,之前的网络拦截器对url或端口进行了修改,
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
// 确认chain.proceed()是唯一调用,在connectInteceptor及其之后的拦截器最多只能调用一次proceed!!
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
// 创建下个拦截链处理
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
//取出下标为index的拦截器,并调用其intercept方法,将新建的链传入。
val interceptor = interceptors[index]
//责任链设计,依次调用拦截器
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// 保证在ConnectInterceptor及其之后的拦截器至少调用一次proceed!!
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
责任链在不同拦截器对于proceed的调用次数有不同限制,ConnectInterceptor
拦截器及其之后的拦截器能且只能调用一次。因为网络握手、连接、发送请求的工作发生在这些拦截器内,表示正式发出了一次网络请求。而在这之前的拦截器可以执行多次proceed。
其中主要实现了创建下一级责任链,然后取出当前拦截器,调用intercept
方法并传入创建的责任链,经过拦截链一级一级的调用,最终执行到CallServerInterceptor的
intercept返回
Response`对象。
缓存拦截器
要了解缓存拦截器的实现原理,首先就要先知道Http缓存的知识点。Http缓存分为两类(强制缓存和对比缓存)。
强制缓存
强制缓存是指网络请求响应header标识了Expires或Cache-Control带了max-age信息,而此时客户端计算缓存并未过期,则可以直接使用本地缓存内容,而不用真正的发起一次网络请求。
对比缓存
浏览器第一次请求数据时,服务器会将缓存标识与数据一起返回给客户端,客户端将二者备份至缓存数据库中。
再次请求数据时,客户端将备份的缓存标识发送给服务器,服务器根据缓存标识进行判断,判断成功后,返回304状态码,通知客户端比较成功,可以使用缓存数据。
在对比缓存中,缓存标识的传递需要重点理解下。
- Last-Madified: 服务器在响应请求时,告诉浏览器资源的最后修改时间。
- If-Modified-Since:再次请求服务器时,通过此字段通知服务器上次请求时,服务器返回的资源最后修改时间。
- Etag:(优先级高于Last-Modified / If-Modified-Since)服务器响应请求时,告诉浏览器当前资源在服务器的唯一标识。
- If-None-Match:再次请求时,通过此字段通知服务器客户端缓存数据的唯一标识。
对于强制缓存,服务器通知浏览器一个缓存时间,在缓存时间内,下次请求,直接用缓存,不在时间内,执行比较缓存策略。
对于比较缓存,将缓存信息中的Etag和Last-Modified通过请求发送给服务器,由服务器校验,返回304状态码时,浏览器直接使用缓存。
在CacheInterceptor
会调用intercept
方法
###CacheInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//用requestd的url 从缓存中获取响应
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//根据request 候选Response 获取缓存策略
//缓存策略 将决定是否使用缓存
//strategy.networkRequest为null,不使用网络;
//strategy.cacheResponse为null,不使用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
//处理是命中网络还是本地缓存
//根据缓存策略更新统计指标:请求次数、网络请求次数、使用缓存次数
cache?.trackResponse(strategy)
if (cacheCandidate != null && cacheResponse == null) {
// 有缓存 但不能用,关闭
cacheCandidate.body?.closeQuietly()
}
// 网络和缓存都不能用 返回504
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// 如果不用网络 cacheResponse肯定就不为null,那么就使用缓存
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
//networkRequest 不为null要进行网络请求了,所以 拦截器链 就继续 往下处理了
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// 如果网络请求返回304 标识服务端资源没有修改
// 根据网络响应和缓存响应,更新缓存
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
//如果不是304,说明服务端资源有更新 就关闭缓存body
cacheResponse.body?.closeQuietly()
}
}
//将网络数据和缓存传入
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
//网络响应可缓存 (CacheStrategy.isCacheable对响应处理,根据response的code和response.cacheControl.noStore)
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 写入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
//OkHttp默认只会对get请求进行缓存,因为get请求的数据一般是比较持久的,而post一般是交互操作,没太大意义进行缓存
//不是get请求就移除缓存
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
用到了使用缓存策略CacheStrategy
来确定是否使用缓存。
可以理解为:
- 网络和缓存都不能用 ,返回504
- 网络networkResponse为null, cacheResponse肯定就不为null,那么就使用缓存
- networkResponse不为null,不管cacheResponse是否为null,直接去请求网络
- cacheResponse 不为null,如果网络请求返回304 标识服务端资源没有修改
- 如果不是304,说明服务端资源有更新,将网络数据和缓存传入
- 如果网络响应可缓存,返回
cacheWritingResponse
- 最后一步,不是get请求就移除缓存
//根据request 候选Response 获取缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
再来看下CacheStrategy
中处理了什么?
###CacheStrategy
//给定一个请求和缓存的响应,这将确定是使用网络、缓存还是两者兼而有之。选择缓存策略可能会向请求添加条件(如条件 GET 的“If-Modified-Since”标头)或向缓存响应添加警告(如果缓存数据可能过时)。
//初始化内部
class Factory(
private val nowMillis: Long,
internal val request: Request,
private val cacheResponse: Response?
) {
//服务缓存响应的服务器时间
private var servedDate: Date? = null
private var servedDateString: String? = null
//缓存响应的最后修改时间
private var lastModified: Date? = null
private var lastModifiedString: String? = null
//缓存响应的到期时间
private var expires: Date? = null
//设置的指定 第一次发起缓存的Http请求时的时间戳
private var sentRequestMillis = 0L
//第一次收到缓存响应的时间戳
private var receivedResponseMillis = 0L
//缓存响应的Etag
private var etag: String? = null
//缓存响应的存活时间
private var ageSeconds = -1
/**
* Returns true if computeFreshnessLifetime used a heuristic. If we used a heuristic to serve a
* cached response older than 24 hours, we are required to attach a warning.
*/
private fun isFreshnessLifetimeHeuristic(): Boolean {
return cacheResponse!!.cacheControl.maxAgeSeconds == -1 && expires == null
}
init {
if (cacheResponse != null) {
//请求时间、响应时间、过期时长、修改时间、资源标记,都是从缓存响应中获取
this.sentRequestMillis = cacheResponse.sentRequestAtMillis
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis
val headers = cacheResponse.headers
for (i in 0 until headers.size) {
val fieldName = headers.name(i)
val value = headers.value(i)
when {
fieldName.equals("Date", ignoreCase = true) -> {
servedDate = value.toHttpDateOrNull()
servedDateString = value
}
fieldName.equals("Expires", ignoreCase = true) -> {
expires = value.toHttpDateOrNull()
}
fieldName.equals("Last-Modified", ignoreCase = true) -> {
lastModified = value.toHttpDateOrNull()
lastModifiedString = value
}
fieldName.equals("ETag", ignoreCase = true) -> {
etag = value
}
fieldName.equals("Age"Retrofit2源码解读