OkHttp源码分析
Posted 我就是马云飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OkHttp源码分析相关的知识,希望对你有一定的参考价值。
本文基于OkHttp 4.9.0分析
OkHttp是什么?
众所周知,OkHttp是一个客户端用来发送HTTP消息并对服务器的响应做出处理的应用层框架。而且现在流行的Retrofit的底层同样也是基于Okhttp的。那么OkHttp有什么优点呢?我们来看下:
- 无缝的支持GZIP减少数据流量
- 缓存响应数据减少重复的网络请求
- 请求失败自动重试主机的其他ip,自动重定向。
- 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟。
- …
使用方式
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(url)
.build();
//同步
Response response = client.newCall(request).execute();
//异步
Response response = client.newCall(request).enqueue();
我们可以看到同步和异步是调用Call的execute和enqueue方法,我们来看下具体的实现:
override fun execute(): Response
check(executed.compareAndSet(false, true)) "Already Executed"
timeout.enter()
callStart()
try
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
finally
client.dispatcher.finished(this)
override fun enqueue(responseCallback: Callback)
check(executed.compareAndSet(false, true)) "Already Executed"
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
可以看出,不管是同步还是异步,都会使用dispatcher,也就是任务分发器。
Dispatcher分发器
前面我们说到了不管同步还是异步,都会使用dispatcher,dispatcher内部会有一个线程池,也就是使用异步请求的时候我们会用到的。我们先来看下Dispatcher内部基本的成员变量:
//异步请求的最大数量
var maxRequests = 64
//每个主机同时请求的最大数量
var maxRequestsPerHost = 5
//闲置任务
var idleCallback: Runnable? = null
//异步请求线程池
private var executorServiceOrNull: ExecutorService? = null
val executorService: ExecutorService
//异步请求等待队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//异步请求执行队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//同步请求执行队列
private val runningSyncCalls = ArrayDeque<RealCall>()
同步请求
@Synchronized internal fun executed(call: RealCall)
runningSyncCalls.add(call)
因为是同步请求,所以无需做任何操作,只需要把执行的callback放入同步队列中即可。
异步请求
internal fun enqueue(call: AsyncCall)
synchronized(this)
readyAsyncCalls.add(call)
if (!call.call.forWebSocket)
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
promoteAndExecute()
可以看到执行异步请求的时候,我们先把请求放入等待队列中,然后调用promoteAndExecute。我们看下这段代码是用来干什么的?
private fun promoteAndExecute(): Boolean
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this)
val i = readyAsyncCalls.iterator()
while (i.hasNext())
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
isRunning = runningCallsCount() > 0
for (i in 0 until executableCalls.size)
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
return isRunning
当正在执行的请求没有超过最大请求数64个时,并且同一个host的请求没有超过5个时,将它加入到执行队列。开始执行。
当请求执行完成后,还会调用分发器当finish方法,我们看下finish方法:
/** Used by [AsyncCall.run] to signal completion. */
internal fun finished(call: AsyncCall)
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall)
finished(runningSyncCalls, call)
private fun <T> finished(calls: Deque<T>, call: T)
val idleCallback: Runnable?
synchronized(this)
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null)
idleCallback.run()
不管同步还是异步,执行完成后,都需要从队列中移除,然后判断是否有正在执行的任务,如果没有的话,就执行闲置任务。
请求流程
前面梳理了分发器的同步和异步操作,但是真正的请求流程还是在RealCall中。我们来看下它的同步和异步方法:
override fun execute(): Response
check(executed.compareAndSet(false, true)) "Already Executed"
timeout.enter()
callStart()
try
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
finally
client.dispatcher.finished(this)
可以看到同步请求直接return了getResponseWithInterceptorChain()
方法。我们在看看异步的请求:
override fun enqueue(responseCallback: Callback)
check(executed.compareAndSet(false, true)) "Already Executed"
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
可以看到,在异步请求的时候丢给了AsyncCall去处理。而AsyncCall是一个Runnable。我们直接看看它的run方法。
override fun run()
threadName("OkHttp $redactedUrl()")
var signalledCallback = false
timeout.enter()
try
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
catch (e: IOException)
if (signalledCallback)
// Do not signal the callback twice!
Platform.get().log("Callback failure for $toLoggableString()", Platform.INFO, e)
else
responseCallback.onFailure(this@RealCall, e)
catch (t: Throwable)
cancel()
if (!signalledCallback)
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
throw t
finally
client.dispatcher.finished(this)
可以看到,这里也是getResponseWithInterceptorChain()
去拿到它的response。而OkHttp中最核心的也就是这个方法,处理了各种拦截器的逻辑。
拦截器
无论同异步请求都会调用到getResponseWithInterceptorChain()
,这个方法主要使用责任链模式将整个请求分为几个拦截器调用,简化了各自的责任和逻辑,而且还可以扩展一些自定义的拦截器。如果不清楚责任链模式,请先查看设计模式之责任链模式。
如何进行拦截
前面说到了责任链模式,那我们看下RealCall中是如何进行责任链形式的调用,其实主要方法是在RealInterceptorChain中。我们看下这个方法:
@Throws(IOException::class)
override fun proceed(request: Request): Response
check(index < interceptors.size)
calls++
if (exchange != null)
check(exchange.finder.sameHostAndPort(request.url))
"network interceptor $interceptors[index - 1] must retain the same host and port"
check(calls == 1)
"network interceptor $interceptors[index - 1] must call proceed() exactly once"
//注释1
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null)
check(index + 1 >= interceptors.size || next.calls == 1)
"network interceptor $interceptor must call proceed() exactly once"
check(response.body != null) "interceptor $interceptor returned a response with no body"
return response
可以看到注释1处,拿到下一级的拦截器,不断执行它的intercept方法,最后return response 然后一层一层向上反馈数据。
拦截器分析
现在我们来看下拦截器具体的逻辑,直接查看getResponseWithInterceptorChain()
这个方法:
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//自定义的拦截器
interceptors += client.interceptors
//处理重定向的后续请求和失败重试
interceptors += RetryAndFollowUpInterceptor(client)
//补全请求,处理网络桥接的
interceptors += BridgeInterceptor(client.cookieJar)
//处理缓存的
interceptors += CacheInterceptor(client.cache)
//处理tcp链接的
interceptors += ConnectInterceptor
//处理网络的
if (!forWebSocket)
interceptors += client.networkInterceptors
//处理服务器通信,并封装请求数据与解析响应数据
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try
val response = chain.proceed(originalRequest)
if (isCanceled())
response.closeQuietly()
throw IOException("Canceled")
return response
catch (e: IOException)
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
finally
if (!calledNoMoreExchanges)
noMoreExchanges(null)
RetryAndFollowUpInterceptor
这个拦截器主要处理重试以及重定向的,一般情况下,第一次请求不会涉及到,我们先来看下重试的方法。
try
response = realChain.proceed(request)
newExchangeFinder = true
catch (e: RouteException)
// 尝试通过路由链接失败,请求不会发送
if (!recover(e.lastConnectException, call, request, requestSendStarted = false))
throw e.firstConnectException.withSuppressed(recoveredFailures)
else
recoveredFailures += e.firstConnectException
newExchangeFinder = false
continue
catch (e: IOException)
// 与服务器通信的时候发生了异常,请求可能已经发送了
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException))
throw e.withSuppressed(recoveredFailures)
else
recoveredFailures += e
newExchangeFinder = false
continue
可以看到两个异常都是根据recover方法判断是否能够进行重试,如果返回true,则表示允许重试。那么我们来看下recover方法:
private fun recover( e: IOException, call: RealCall, userRequest: Request,
requestSendStarted: Boolean
): Boolean
// 本身设置了不允许重试
if (!client.retryOnConnectionFailure) return false
// 如果没法再次发送请求
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 如果不是重试的异常
if (!isRecoverable(e, requestSendStarted)) return false
// 没有其他路径进行链接
if (!call.retryAfterFailure()) return false
// For failure recovery, use the same route selector with a new connection.
return true
如果请求结束没有出现异常,不代表当前的响应就是最终交互的,因为我们还需要判断是否需要重定向,而重定向的方法是followUpRequest。我们来看一下:
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request?
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
when (responseCode)
//407:使用了代理服务器,让代理服务器授权
HTTP_PROXY_AUTH ->
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP)
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
return client.proxyAuthenticator.authenticate(route, userResponse)
//401:未经授权,进行授权
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
//300,301,302,303,307,308: 需要重定向,进行重定向操作。
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER ->
return buildRedirectRequest(userResponse, method)
// 408 :请求超时
HTTP_CLIENT_TIMEOUT ->
//如果客户端不允许,直接返回null
if (!client.retryOnConnectionFailure)
return null
//如果尝试了,还是失败,就不管了,返回null
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot())
return null
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT)
return null
//如果服务器告诉我们重试时间,我们也不管了,返回null
if (retryAfter(userResponse, 0) > 0)
return null
return userResponse.request
//503:服务不可用,但是只在服务器告诉你 Retry-After:0(意思就是立即重试) 才重请求
HTTP_UNAVAILABLE ->
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE)
return null
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0)
return userResponse.request
return null
//421:即使域名不同,Okhttp还是可以合并Http2链接,当返回421时,可以用其他链接进行重试。
HTTP_MISDIRECTED_REQUEST ->
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot())
return null
if (exchange == null || !exchange以上是关于OkHttp源码分析的主要内容,如果未能解决你的问题,请参考以下文章