OkHttp源码解析笔记

Posted 巨头之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OkHttp源码解析笔记相关的知识,希望对你有一定的参考价值。

OkHttp源码解析笔记

本篇OkHttp源码基于3.0 Kotlin版本

1.Retrofit的基本使用

首先看OkHttp的基本使用

fun main()
   val okHttpClient = OkHttpClient().newBuilder().build()
   val request = Request.Builder()
            .method("", null)
            .url("")
            .build()
   val call = okHttpClient.newCall(request)
    //同步
   call.execute()
    //异步
   call.enqueue(object: Callback
                override fun onFailure(call: Call, e: IOException) 
                override fun onResponse(call: Call, response: Response) 
            
    )


2.从OkHttp的基本API入手:

从OkHttpClient类的newCall函数出发,看下面代码注释

open class OkHttpClient internal constructor(
 	 builder: Builder
	) : Cloneable, Call.Factory, WebSocket.Factory 

		override fun newCall(request: Request): Call 

    	//RealCall是OkHttp的核心类,
		//分别从RealCall类的enqueue和execute入手看源码
    	return RealCall.newRealCall(this, request, forWebSocket = false)

  		


接着看RealCall类的同步请求函数,将同步请求交给任务调度类Dispatcher

internal class RealCall private constructor(
  	val client: OkHttpClient,val originalRequest: Request,
  	val forWebSocket: Boolean
	) : Call 
		
		override fun execute(): Response 
		    synchronized(this) 
		      check(!executed)  "Already Executed" 
		      executed = true
		    
		    transmitter.timeoutEnter()
		    transmitter.callStart()
		    try 

			  //将同步请求交给任务调度类 Dispatcher
		      client.dispatcher.executed(this) 
			  //核心在这个getResponseWithInterceptorChain函数
		      return getResponseWithInterceptorChain()

		     finally 
		      client.dispatcher.finished(this)
		    
  		


class Dispatcher constructor() 

	@Synchronized internal fun executed(call: RealCall) 
		    //添加到同步运行队列中
		    runningSyncCalls.add(call)
  	


可以看到同步请求走到getResponseWithInterceptorChain函数,这里我们先卡住,先看异步请求

internal class RealCall private constructor(
  	val client: OkHttpClient,val originalRequest: Request,
  	val forWebSocket: Boolean
	) : Call 

		  //http异步执行请求的调用从这里开始
		  override fun enqueue(responseCallback: Callback) 
		    //是否已执行
		    synchronized(this) 
		      check(!executed)  "Already Executed" 
		      executed = true
		    
		    transmitter.callStart()
		    // 将异步任务 交给 Dispatcher(调度器) 去执行
		    client.dispatcher.enqueue(AsyncCall(responseCallback))
		  


可以看到异步请求同样交给任务调度器Dispatcher,接着看Dispatcher类的enqueue函数

class Dispatcher constructor() 

		//okHttp调用的enqueue进行异步请求
		  //实际是调用到这里的Dispatcher(调度器)类的enqueue函数
		internal fun enqueue(call: AsyncCall) 
		    synchronized(this) 
		      //添加到异步等待队列
		      readyAsyncCalls.add(call)
		
		      //这里的forWebSocket变量已经在OkHttpClient的newCall函数中给赋予 false
		      if (!call.get().forWebSocket) 
		        val existingCall = findExistingCallWithHost(call.host())
		          // 使call共享已存在的同一主机
		        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
		      
		    
		    promoteAndExecute() //重点从这里开始

enqueue函数的核心在调用了promoteAndExecute函数,promoteAndExecute函数内的主要工作:

  • 1.遍历等待队列,满足 “运行队列的个数不超过64个,同一个host不超过5条线程执行请求” 的条件下,将等待线程添加到到运行队列中,并且执行该线程
  • 2.当前还有运行请求中的线程 则返回true, 否则返回false
private fun promoteAndExecute(): Boolean 
		    this.assertThreadDoesntHoldLock()
		
		    val executableCalls = mutableListOf<AsyncCall>()
		    val isRunning: Boolean
		    synchronized(this) 
		      val i = readyAsyncCalls.iterator()  //readyAsyncCalls: 异步等待线程队列

		      //对异步等待队列进行迭代
		      while (i.hasNext()) 
		        val asyncCall = i.next()
		        //能进行的最大请求数不能超过  maxRequests = 64 个
		        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
		        // 同一个host最多允许maxRequestsPerHost = 5条线程通知执行请求
		        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
		
		        i.remove()
		        asyncCall.callsPerHost().incrementAndGet() //同一个host +1 
		        executableCalls.add(asyncCall)     //将异步任务加入执行的集合里
		        runningAsyncCalls.add(asyncCall)   //将异步任务加入运行的队列里
		      
		      isRunning = runningCallsCount() > 0
		    
			//线程池里执行异步请求
		    for (i in 0 until executableCalls.size) 
		      val asyncCall = executableCalls[i]
		      asyncCall.executeOn(executorService)
		    
		
		    return isRunning
		  


接着看asyncCall.executeOn(executorService)这句代码,这句代码是执行 添加到运行队列的线程,可以看到此处的AsyncCall是RealCall类的内部类,AyncCall继承自Runnable

internal class RealCall private constructor(
	  val client: OkHttpClient,  val originalRequest: Request,
	  val forWebSocket: Boolean
	) : Call 

		internal inner class AsyncCall(
		    private val responseCallback: Callback
		  ) : Runnable 
			
			fun executeOn(executorService: ExecutorService) 
	
		      client.dispatcher.assertThreadDoesntHoldLock()
		
		      var success = false
		      try 
		        //线程池里执行线程,执行下面的run函数
		        executorService.execute(this)
		        success = true
		       catch (e: RejectedExecutionException) 
		        val ioException = InterruptedIOException("executor rejected")
		        ioException.initCause(e)
		        transmitter.noMoreExchanges(ioException)
		        responseCallback.onFailure(this@RealCall, ioException)
		       finally 
		        if (!success) // 该线程没有响应 则取消
		          client.dispatcher.finished(this) // This call is no longer running!
		        
		      
		    
		    //核心在run函数的getResponseWithInterceptorChain方法
		    override fun run() 
		      threadName("OkHttp $redactedUrl()") 
		        var signalledCallback = false
		        transmitter.timeoutEnter()
		        try 
		          //这里的getResponseWithInterceptorChain函数是OkHttp的核心,
				  //同步、异步请求都会调用getResponseWithInterceptorChain这个函数
		          val response = getResponseWithInterceptorChain()
		          signalledCallback = true
		          responseCallback.onResponse(this@RealCall, response)
		         catch (e: IOException) 
		          if (signalledCallback) 
		            // Do not signal the callback twice!
		            Platform.get().log("Callback failure for $toLoggableString()", INFO, e)
		           else 
		            responseCallback.onFailure(this@RealCall, e)
		          
		         catch (t: Throwable) 
		          cancel()
		          if (!signalledCallback) 
		            val canceledException = IOException("canceled due to $t")
		            canceledException.addSuppressed(t)
		            responseCallback.onFailure(this@RealCall, canceledException)
		          
		          throw t
		         finally 
		          client.dispatcher.finished(this)
		        
		      
		    
	
		

在这里,异步请求跟同步请求一样执行到getResponseWithInterceptorChain这个函数,接着我们先看下上面代码处的两处finally,都调用到finished

class Dispatcher constructor() 

		internal fun finished(call: AsyncCall) 
			//将运行的地址host集合 减 1
		    call.callsPerHost().decrementAndGet()  
		    finished(runningAsyncCalls, call)
		

		private fun <T> finished(calls: Deque<T>, call: T) 
		    val idleCallback: Runnable?
		    synchronized(this) 
		      //将当前无响应的线程 或 执行完成的线程 进行移除
		      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
		      idleCallback = this.idleCallback
		    

		    //这里之所以执行promoteAndExecute函数,是要检查等待队列是否存在等待线程,如果存在,遍历等待队列的线程并
		    //检验执行队列的线程数是否达到最大数(64),如果没有达到最大数,则将等待线程加入到请求队列中,这样也就确保了等待队列能够
		    //及时添加到执行队列,确保了等待队列的线程能被执行
		    val isRunning = promoteAndExecute()
		
		    if (!isRunning && idleCallback != null) //没有运行的线程
		      idleCallback.run()//启动之前闲置的线程
		    
		 

可以看到Dispatcher类的finished函数主要做了两件事:

  • 移除 无响应线程或执行完成的线程
  • 调用上面提到的promoteAndExecute函数,对等待队列和运行队列进行调度,使得等待线程可以及时被执行

接着回到getResponseWithInterceptorChain这个函数,可以看到主要配置了多个拦截器,而这些拦截器负责对请求数据和响应数据进行处理,内部采用的是责任链模式, 然后创建RealInterceptorChain这个责任链的调度类,调用其proceed函数开启责任链

@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response 
	    // Build a full stack of interceptors.
	    //TODO 责任链
	    val interceptors = mutableListOf<Interceptor>()
	    //TODO 在配置okhttpClient 时设置的intercept 由用户自己设置
	    interceptors += client.interceptors
	    //TODO 负责处理失败后的重试与重定向
	    interceptors += RetryAndFollowUpInterceptor(client)
	    //TODO 深加工用户传递来的请求,设置默认请求头;用户没有设置时默认采用 gzip 压缩解压数据
	    interceptors += BridgeInterceptor(client.cookieJar)
	    //TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
	    //TODO 设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
	    //TODO 可配置用户自己设置的缓存拦截器
	    interceptors += CacheInterceptor(client.cache)
	    //TODO 连接服务器,负责和服务器建立连接
	    interceptors += ConnectInterceptor
	    if (!forWebSocket) 
	      //TODO 配置OkhttpClient 时设置的networkInterceptors
	      //TODO 返回观察单个网络请求和响应的不可变拦截器列表。
	      interceptors += client.networkInterceptors
	    
	    //TODO 执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
	    //TODO 进行http请求报文的封装与请求报文的解析
	    interceptors += CallServerInterceptor(forWebSocket)
	    //TODO 创建责任链
	    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
	        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
	
	    var calledNoMoreExchanges = false
	    try 
	      //TODO 执行责任链
	      val response = chain.proceed(originalRequest)
	      if (transmitter.isCanceled) 
	        response.closeQuietly()
	        throw IOException("Canceled")
	      
	      return response
	     catch (e: IOException) 
	      calledNoMoreExchanges = true
	      throw transmitter.noMoreExchanges(e) as Throwable
	     finally 
	      if (!calledNoMoreExchanges) 
	        transmitter.noMoreExchanges(null)
	      
	    

下面看下CacheInterceptor缓存拦截器部分的实现, 可以看到:

  • 缓存拦截器会根据请求的信息和缓存的响应的信息来判断是否存在缓存可用;
  • 如果有可以使用的缓存,那么就返回该缓存给用户,否则就继续使用责任链模式来从服务器中获取响应;
  • 当获取到响应的时候,又会把响应缓存到磁盘上面,最后返回reseponse
class CacheInterceptor(internal val cache: Cache?): Interceptor 

		  @Throws(IOException::class)
		  override fun intercept(chain: Interceptor.Chain): Response 
		    //根据request得到cache中缓存的response,
		    // 如果用户没有配置缓存拦截器:  cacheCandidate == null,则说明cache(用户自配的缓存拦截器)也是为null
		    val cacheCandidate = cache?.get(chain.request())
		
		    val now = System.currentTimeMillis()
		    // request判断缓存策略,是否使用了网络、缓存或两者都使用
		    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
		    //如果networkRequest == null 则说明不使用网络请求
		    val networkRequest = strategy.networkRequest
		    //获取缓存中(CacheStrategy)的Response
		    val cacheResponse = strategy.cacheResponse
		
		    cache?.trackResponse(strategy)
		
		    //缓存无效 关闭资源
		    if (cacheCandidate != null && cacheResponse == null) 
		      // The cache candidate wasn't applicable. Close it.
		      cacheCandidate.body?.closeQuietly()
		    
		
		    // If we're forbidden from using the network and the cache is insufficient, fail.
		    //networkRequest == null 不使用网路请求 且没有缓存 cacheResponse == null  返回失败
		    if (networkRequest == null && cacheResponse == null) 
		      return Response.Builder()
		          .request(chain.request())
		          .protocol(Protocol.HTTP_1_1)
		          .code(HTTP_GATEWAY_TIMEOUT)
		          .message("Unsatisfiable Request (only-if-cached)")
		          .body(EMPTY_RESPONSE)
		          .sentRequestAtMillis(-1L)
		          .receivedResponseAtMillis(System.currentTimeMillis())
		          .build()
		    
		
		    // If we don't need the network, we're done.
		    //不使用网络请求 且存在缓存 直接返回响应
		    if (networkRequest == null) 
		      return cacheResponse!!.newBuilder()
		          .cacheResponse(stripBody(cacheResponse))
		          .build()
		    
		
		    var networkResponse: Response? = null
		    try 
		      //调用下一个拦截器,从网络上获取数据
		      networkResponse = chain.proceed(networkRequest)
		     finally 
		      // If we're crashing on I/O or otherwise, don't leak the cache body.
		      if (networkResponse == null && cacheCandidate != null) 
		        cacheCandidate.body?.closeQuietly()
		      
		    
		
		    // If we have a cache response too, then we're doing a conditional get.
		    // 如果本地已经存在cacheResponse,那么让它和网络得到的networkResponse做比较,决定是否来更新缓存的cacheResponse
		    if (cacheResponse != null) 
		      //HTTP_NOT_MODIFIED = 304 : 告知客户端,服务端内容未改变,不会返回数据,继续使用上一次的响应的内容
		      if (networkResponse?.code == HTTP_NOT_MODIFIED) 
		        val response = cacheResponse.newBuilder()
		            .headers(combine(cacheResponse.headers, networkResponse.headers))
		            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
		            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
		            .cacheResponse(stripBody(cacheResponse))
		            .networkResponse(stripBody(networkResponse))
		            .build()
		
		        networkResponse.body!!.close()
		
		        // Update the cache after combining headers but before stripping the
		        // Content-Encoding header (as performed by initContentStream()).
		        cache!!.trackConditionalCacheHit()
		        cache.update(cacheResponse, response)  //更新DiskLruCache缓存
		        return response
		       else 
		        cacheResponse.body?.closeQuietly()
		      
		    
		
		    val response = networkResponse!!.newBuilder()
		        .cacheResponse(stripBody(cacheResponse))
		        .networkResponse(stripBody(networkResponse))
		        .build()
		    //cache不为null,说明用户自己配置了缓存拦截器
		    if (cache != null) 
		      // 缓存未经缓存过的response
		      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) 
		        // Offer this request to the cache.
		        val cacheRequest = cache.put(response)
		        return cacheWritingResponse(cacheRequest, response)
		      
		
		      if (HttpMethod.invalidatesCache(networkRequest.method)) 
		        try 
		          cache.remove(networkRequest)
		         catch (_: IOException) 
		          // The cache cannot be written.
		        
		      
		    
		
		    return response
		  

接着看ConnectInterceptor连接拦截器的实现,可以看到

  • 1.判断当前的连接是否可以使用:流是否已经被关闭,并且已经被限制创建新的流;
  • 2.如果当前的连接无法使用,就从连接池中获取一个连接;
  • 3.连接池中也没有发现可用的连接,创建一个新的连接,并进行握手,然后将其放到连接池中。
  • 补充:连接复用的一个好处就是省去了进行 TCP 和 TLS 握手的一个过程。因为建立连接本身也是需要消耗一些时间的,连接被复用之后可以提升我们网络访问的效率
class ExchangeFinder(
	  private val transmitter: Transmitter,
	  private val connectionPool: RealConnectionPool,
	  private val address: Address,
	  private val call: Call,
	  private val eventListener: EventListener
	) 

		  @Throws(IOException::class)
		  private fun findConnection(
		    connectTimeout: Int,
		    readTimeout: Int,
		    writeTimeout: Int,
		    pingIntervalMillis: Int,
		    connectionRetryEnabled: Boolean
		  ): RealConnection 

		    var foundPooledConnection = false
		    var result: RealConnection? = null
		    var selectedRoute: Route? = null
		    var releasedConnection: RealConnection?
		    val toClose: Socket?
		    synchronized(connectionPool) 
		      if (transmitter.isCanceled) throw IOException("Canceled")
		      hasStreamFailure = false // This is a fresh attempt.
		      // 尝试使用已分配的连接,已经分配的连接可能已经被限制创建新的流
		      releasedConnection = transmitter.conn

以上是关于OkHttp源码解析笔记的主要内容,如果未能解决你的问题,请参考以下文章

OkHttp源码解析(小白必看,建议收藏)

OkHttp源码解析(小白必看,建议收藏)

OkHttp-ConnectInterceptor源码解析

OkHttp-ConnectInterceptor源码解析

OkHttp-RetryAndFollowUpInterceptor源码解析

Retrofit源码解析