OkHttp源码解析笔记
Posted 巨头之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OkHttp源码解析笔记相关的知识,希望对你有一定的参考价值。
OkHttp源码解析笔记
本篇OkHttp源码基于3.0 Kotlin版本
1.Retrofit的基本使用
首先看OkHttp的基本使用
fun main()
val okHttpClient = OkHttpClient().newBuilder().build()
val request = Request.Builder()
.method("", null)
.url("")
.build()
val call = okHttpClient.newCall(request)
//同步
call.execute()
//异步
call.enqueue(object: Callback
override fun onFailure(call: Call, e: IOException)
override fun onResponse(call: Call, response: Response)
)
2.从OkHttp的基本API入手:
从OkHttpClient类的newCall函数出发,看下面代码注释
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory
override fun newCall(request: Request): Call
//RealCall是OkHttp的核心类,
//分别从RealCall类的enqueue和execute入手看源码
return RealCall.newRealCall(this, request, forWebSocket = false)
接着看RealCall类的同步请求函数,将同步请求交给任务调度类Dispatcher
internal class RealCall private constructor(
val client: OkHttpClient,val originalRequest: Request,
val forWebSocket: Boolean
) : Call
override fun execute(): Response
synchronized(this)
check(!executed) "Already Executed"
executed = true
transmitter.timeoutEnter()
transmitter.callStart()
try
//将同步请求交给任务调度类 Dispatcher
client.dispatcher.executed(this)
//核心在这个getResponseWithInterceptorChain函数
return getResponseWithInterceptorChain()
finally
client.dispatcher.finished(this)
class Dispatcher constructor()
@Synchronized internal fun executed(call: RealCall)
//添加到同步运行队列中
runningSyncCalls.add(call)
可以看到同步请求走到getResponseWithInterceptorChain函数,这里我们先卡住,先看异步请求
internal class RealCall private constructor(
val client: OkHttpClient,val originalRequest: Request,
val forWebSocket: Boolean
) : Call
//http异步执行请求的调用从这里开始
override fun enqueue(responseCallback: Callback)
//是否已执行
synchronized(this)
check(!executed) "Already Executed"
executed = true
transmitter.callStart()
// 将异步任务 交给 Dispatcher(调度器) 去执行
client.dispatcher.enqueue(AsyncCall(responseCallback))
可以看到异步请求同样交给任务调度器Dispatcher,接着看Dispatcher类的enqueue函数
class Dispatcher constructor()
//okHttp调用的enqueue进行异步请求
//实际是调用到这里的Dispatcher(调度器)类的enqueue函数
internal fun enqueue(call: AsyncCall)
synchronized(this)
//添加到异步等待队列
readyAsyncCalls.add(call)
//这里的forWebSocket变量已经在OkHttpClient的newCall函数中给赋予 false
if (!call.get().forWebSocket)
val existingCall = findExistingCallWithHost(call.host())
// 使call共享已存在的同一主机
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
promoteAndExecute() //重点从这里开始
enqueue函数的核心在调用了promoteAndExecute函数,promoteAndExecute函数内的主要工作:
- 1.遍历等待队列,满足 “运行队列的个数不超过64个,同一个host不超过5条线程执行请求” 的条件下,将等待线程添加到到运行队列中,并且执行该线程
- 2.当前还有运行请求中的线程 则返回true, 否则返回false
private fun promoteAndExecute(): Boolean
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this)
val i = readyAsyncCalls.iterator() //readyAsyncCalls: 异步等待线程队列
//对异步等待队列进行迭代
while (i.hasNext())
val asyncCall = i.next()
//能进行的最大请求数不能超过 maxRequests = 64 个
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 同一个host最多允许maxRequestsPerHost = 5条线程通知执行请求
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet() //同一个host +1
executableCalls.add(asyncCall) //将异步任务加入执行的集合里
runningAsyncCalls.add(asyncCall) //将异步任务加入运行的队列里
isRunning = runningCallsCount() > 0
//线程池里执行异步请求
for (i in 0 until executableCalls.size)
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
return isRunning
接着看asyncCall.executeOn(executorService)这句代码,这句代码是执行 添加到运行队列的线程,可以看到此处的AsyncCall是RealCall类的内部类,AyncCall继承自Runnable
internal class RealCall private constructor(
val client: OkHttpClient, val originalRequest: Request,
val forWebSocket: Boolean
) : Call
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable
fun executeOn(executorService: ExecutorService)
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try
//线程池里执行线程,执行下面的run函数
executorService.execute(this)
success = true
catch (e: RejectedExecutionException)
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
finally
if (!success) // 该线程没有响应 则取消
client.dispatcher.finished(this) // This call is no longer running!
//核心在run函数的getResponseWithInterceptorChain方法
override fun run()
threadName("OkHttp $redactedUrl()")
var signalledCallback = false
transmitter.timeoutEnter()
try
//这里的getResponseWithInterceptorChain函数是OkHttp的核心,
//同步、异步请求都会调用getResponseWithInterceptorChain这个函数
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
catch (e: IOException)
if (signalledCallback)
// Do not signal the callback twice!
Platform.get().log("Callback failure for $toLoggableString()", INFO, e)
else
responseCallback.onFailure(this@RealCall, e)
catch (t: Throwable)
cancel()
if (!signalledCallback)
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
throw t
finally
client.dispatcher.finished(this)
在这里,异步请求跟同步请求一样执行到getResponseWithInterceptorChain这个函数,接着我们先看下上面代码处的两处finally,都调用到finished
class Dispatcher constructor()
internal fun finished(call: AsyncCall)
//将运行的地址host集合 减 1
call.callsPerHost().decrementAndGet()
finished(runningAsyncCalls, call)
private fun <T> finished(calls: Deque<T>, call: T)
val idleCallback: Runnable?
synchronized(this)
//将当前无响应的线程 或 执行完成的线程 进行移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
//这里之所以执行promoteAndExecute函数,是要检查等待队列是否存在等待线程,如果存在,遍历等待队列的线程并
//检验执行队列的线程数是否达到最大数(64),如果没有达到最大数,则将等待线程加入到请求队列中,这样也就确保了等待队列能够
//及时添加到执行队列,确保了等待队列的线程能被执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) //没有运行的线程
idleCallback.run()//启动之前闲置的线程
可以看到Dispatcher类的finished函数主要做了两件事:
- 移除 无响应线程或执行完成的线程
- 调用上面提到的promoteAndExecute函数,对等待队列和运行队列进行调度,使得等待线程可以及时被执行
接着回到getResponseWithInterceptorChain这个函数,可以看到主要配置了多个拦截器,而这些拦截器负责对请求数据和响应数据进行处理,内部采用的是责任链模式, 然后创建RealInterceptorChain这个责任链的调度类,调用其proceed函数开启责任链
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response
// Build a full stack of interceptors.
//TODO 责任链
val interceptors = mutableListOf<Interceptor>()
//TODO 在配置okhttpClient 时设置的intercept 由用户自己设置
interceptors += client.interceptors
//TODO 负责处理失败后的重试与重定向
interceptors += RetryAndFollowUpInterceptor(client)
//TODO 深加工用户传递来的请求,设置默认请求头;用户没有设置时默认采用 gzip 压缩解压数据
interceptors += BridgeInterceptor(client.cookieJar)
//TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
//TODO 设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
//TODO 可配置用户自己设置的缓存拦截器
interceptors += CacheInterceptor(client.cache)
//TODO 连接服务器,负责和服务器建立连接
interceptors += ConnectInterceptor
if (!forWebSocket)
//TODO 配置OkhttpClient 时设置的networkInterceptors
//TODO 返回观察单个网络请求和响应的不可变拦截器列表。
interceptors += client.networkInterceptors
//TODO 执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
//TODO 进行http请求报文的封装与请求报文的解析
interceptors += CallServerInterceptor(forWebSocket)
//TODO 创建责任链
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try
//TODO 执行责任链
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled)
response.closeQuietly()
throw IOException("Canceled")
return response
catch (e: IOException)
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
finally
if (!calledNoMoreExchanges)
transmitter.noMoreExchanges(null)
下面看下CacheInterceptor缓存拦截器部分的实现, 可以看到:
- 缓存拦截器会根据请求的信息和缓存的响应的信息来判断是否存在缓存可用;
- 如果有可以使用的缓存,那么就返回该缓存给用户,否则就继续使用责任链模式来从服务器中获取响应;
- 当获取到响应的时候,又会把响应缓存到磁盘上面,最后返回reseponse
class CacheInterceptor(internal val cache: Cache?): Interceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response
//根据request得到cache中缓存的response,
// 如果用户没有配置缓存拦截器: cacheCandidate == null,则说明cache(用户自配的缓存拦截器)也是为null
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// request判断缓存策略,是否使用了网络、缓存或两者都使用
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//如果networkRequest == null 则说明不使用网络请求
val networkRequest = strategy.networkRequest
//获取缓存中(CacheStrategy)的Response
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
//缓存无效 关闭资源
if (cacheCandidate != null && cacheResponse == null)
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
// If we're forbidden from using the network and the cache is insufficient, fail.
//networkRequest == null 不使用网路请求 且没有缓存 cacheResponse == null 返回失败
if (networkRequest == null && cacheResponse == null)
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
// If we don't need the network, we're done.
//不使用网络请求 且存在缓存 直接返回响应
if (networkRequest == null)
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
var networkResponse: Response? = null
try
//调用下一个拦截器,从网络上获取数据
networkResponse = chain.proceed(networkRequest)
finally
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null)
cacheCandidate.body?.closeQuietly()
// If we have a cache response too, then we're doing a conditional get.
// 如果本地已经存在cacheResponse,那么让它和网络得到的networkResponse做比较,决定是否来更新缓存的cacheResponse
if (cacheResponse != null)
//HTTP_NOT_MODIFIED = 304 : 告知客户端,服务端内容未改变,不会返回数据,继续使用上一次的响应的内容
if (networkResponse?.code == HTTP_NOT_MODIFIED)
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response) //更新DiskLruCache缓存
return response
else
cacheResponse.body?.closeQuietly()
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
//cache不为null,说明用户自己配置了缓存拦截器
if (cache != null)
// 缓存未经缓存过的response
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest))
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
if (HttpMethod.invalidatesCache(networkRequest.method))
try
cache.remove(networkRequest)
catch (_: IOException)
// The cache cannot be written.
return response
接着看ConnectInterceptor连接拦截器的实现,可以看到
- 1.判断当前的连接是否可以使用:流是否已经被关闭,并且已经被限制创建新的流;
- 2.如果当前的连接无法使用,就从连接池中获取一个连接;
- 3.连接池中也没有发现可用的连接,创建一个新的连接,并进行握手,然后将其放到连接池中。
- 补充:连接复用的一个好处就是省去了进行 TCP 和 TLS 握手的一个过程。因为建立连接本身也是需要消耗一些时间的,连接被复用之后可以提升我们网络访问的效率
class ExchangeFinder(
private val transmitter: Transmitter,
private val connectionPool: RealConnectionPool,
private val address: Address,
private val call: Call,
private val eventListener: EventListener
)
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool)
if (transmitter.isCanceled) throw IOException("Canceled")
hasStreamFailure = false // This is a fresh attempt.
// 尝试使用已分配的连接,已经分配的连接可能已经被限制创建新的流
releasedConnection = transmitter.conn以上是关于OkHttp源码解析笔记的主要内容,如果未能解决你的问题,请参考以下文章