源码解读OkHttp内部剖析探究

Posted 丶笑看退场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码解读OkHttp内部剖析探究相关的知识,希望对你有一定的参考价值。

OkHttp内部关键在于拦截器的处理来实现,把网络请求封装到各个拦截器来实现,实现了各层的解耦。

我们首先发起一个请求:

//创建okHttpClient对象
OkHttpClient okHttpClient = new OkHttpClient.Builder()
                    .connectTimeout(6, TimeUnit.SECONDS)
                    .readTimeout(6, TimeUnit.SECONDS)
                    .build();

//创建一个Request
Request request = new Request.Builder().url(strUrl).build();
//发起同步请求
try {
            Response response = client.newCall(request).execute();
            return response.body().string();
        } catch (IOException e) {
            e.printStackTrace();
        }

内部请求流程

最终实现的代码是OkHttp会调用newCall,返回一个RealCall对象,并调用execute同步方法。其中RealCall是管理网络请求的类。

## RealCall   
override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    //开始计时,发送
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      //适配器请求
      client.dispatcher.executed(this)
      //调用拦截器处理
      return getResponseWithInterceptorChain()
    } finally {
      //适配器结束
      client.dispatcher.finished(this)
    }
  }

知道了这个三个方法后。我们一个个来看~

先看这里调用了dispatcherexcuted方法,那Dispatcher在里面起到什么作用?可以看到有三个数组队列进行处理

### Dispatcher
	//准备异步调用队列
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  //运行异步调用队列
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

 	//运行同步调用队列
  private val runningSyncCalls = ArrayDeque<RealCall>()

在我们上一步过程中,调用了

### Dispatcher

@Synchronized internal fun executed(call: RealCall) {
  //放进正在运行的队列
    runningSyncCalls.add(call)
  }

//Dispath 核心逻辑
//将符合条件的调用从 [readyAsyncCalls] 提升到 [runningAsyncCalls] 并在执行程序服务上运行它们。不得与同步调用,因为执行调用可以调用用户代码。 @return true 如果调度程序当前正在运行调用。
  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // 大于最大请求request数量
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // 大于最大容量
//当一个取出来后,从readyAsyncCalls中删除
        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        //移入到runningAsyncCalls中
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    //将executableCalls可执行的调用提交到任务线程池
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

这个方法promoteAndExecute会在入队列和上一个任务执行结束后调用。我们再来看一下里面的executorService线程池的用法。

### Dispatcher
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        //用到了一个CachedThreadPool线程池,快速处理大量耗时较短的任务
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

SynchronousQueue同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue。

分析好后,再看第二个方法getResponseWithInterceptorChain()`

### RealCall  
@Throws(IOException::class)
  fun getResponseWithInterceptorChain(): Response {
    // 创建拦截器数组
    val interceptors = mutableListOf<Interceptor>()
    //添加自定义拦截器
    interceptors += client.interceptors】
    //添加重试和重定向拦截器
    interceptors += RetryAndFollowUpInterceptor(client)
    //添加桥拦截器
    interceptors += BridgeInterceptor(client.cookieJar)
    //添加缓存拦截器
    interceptors += CacheInterceptor(client.cache)
    //添加连接池拦截器
    interceptors += ConnectInterceptor
    //添加网络拦截器
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    //添加网络请求拦截器
    interceptors += CallServerInterceptor(forWebSocket)

    //创建拦截器链,所有拦截器的最终调用者
    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

    var calledNoMoreExchanges = false
    try {
      //启动拦截器链
      val response = chain.proceed(originalRequest)
      .....
      return response
    } 
    ......
  }

可以看到addInterceptor(interceptor)所设置的拦截器会在所有其他Intercept处理之前运行。后面就是默认的五个拦截器。

拦截器作用
应用拦截器处理header头信息,
RetryAndFollowUpInterceptor负责出错重试,重定向
BridgeInterceptor填充http请求协议中的head头信息
CacheInterceptor缓存拦截器,如果命中缓存则不会发起网络请求。
ConnectInterceptor连接池拦截器,Okhttp中的核心
networkInterceptors自定义网络拦截器,用于监控网络传输数据
CallServerInterceptor负责和网络的收发

拦截器待会分析,把流程先走完。最终会执行chain.proceed(originalRequest),看下内部实现

  ### RealInterceptorChain
  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    //当前拦截器调用proceed的次数
    calls++

    //exchange 传输单个 HTTP 请求和响应对
    // 确认传入的请求正在调用,之前的网络拦截器对url或端口进行了修改,
    check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
      "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // 确认chain.proceed()是唯一调用,在connectInteceptor及其之后的拦截器最多只能调用一次proceed!!
    check(this.exchange == null || calls <= 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // 创建下个拦截链处理
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    
    //取出下标为index的拦截器,并调用其intercept方法,将新建的链传入。
    val interceptor = interceptors[index]

    //责任链设计,依次调用拦截器
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    // 保证在ConnectInterceptor及其之后的拦截器至少调用一次proceed!!
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
      "network interceptor $interceptor must call proceed() exactly once"
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

责任链在不同拦截器对于proceed的调用次数有不同限制,ConnectInterceptor拦截器及其之后的拦截器能且只能调用一次。因为网络握手、连接、发送请求的工作发生在这些拦截器内,表示正式发出了一次网络请求。而在这之前的拦截器可以执行多次proceed。

其中主要实现了创建下一级责任链,然后取出当前拦截器,调用intercept方法并传入创建的责任链,经过拦截链一级一级的调用,最终执行到CallServerInterceptor的intercept返回Response`对象。

缓存拦截器

要了解缓存拦截器的实现原理,首先就要先知道Http缓存的知识点。Http缓存分为两类(强制缓存和对比缓存)。

彻底弄懂HTTP缓存机制及原理

强制缓存

强制缓存是指网络请求响应header标识了Expires或Cache-Control带了max-age信息,而此时客户端计算缓存并未过期,则可以直接使用本地缓存内容,而不用真正的发起一次网络请求。

对比缓存

浏览器第一次请求数据时,服务器会将缓存标识与数据一起返回给客户端,客户端将二者备份至缓存数据库中。
再次请求数据时,客户端将备份的缓存标识发送给服务器,服务器根据缓存标识进行判断,判断成功后,返回304状态码,通知客户端比较成功,可以使用缓存数据。

在对比缓存中,缓存标识的传递需要重点理解下。

  • Last-Madified: 服务器在响应请求时,告诉浏览器资源的最后修改时间。
  • If-Modified-Since:再次请求服务器时,通过此字段通知服务器上次请求时,服务器返回的资源最后修改时间。
  • Etag:(优先级高于Last-Modified / If-Modified-Since)服务器响应请求时,告诉浏览器当前资源在服务器的唯一标识。
  • If-None-Match:再次请求时,通过此字段通知服务器客户端缓存数据的唯一标识。

对于强制缓存,服务器通知浏览器一个缓存时间,在缓存时间内,下次请求,直接用缓存,不在时间内,执行比较缓存策略。
对于比较缓存,将缓存信息中的Etag和Last-Modified通过请求发送给服务器,由服务器校验,返回304状态码时,浏览器直接使用缓存。

CacheInterceptor会调用intercept方法

###CacheInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    //用requestd的url 从缓存中获取响应 
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    //根据request 候选Response 获取缓存策略
    //缓存策略 将决定是否使用缓存
    //strategy.networkRequest为null,不使用网络;
    //strategy.cacheResponse为null,不使用缓存
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    //处理是命中网络还是本地缓存
    //根据缓存策略更新统计指标:请求次数、网络请求次数、使用缓存次数
    cache?.trackResponse(strategy)

    if (cacheCandidate != null && cacheResponse == null) {
      // 有缓存 但不能用,关闭
      cacheCandidate.body?.closeQuietly()
    }

    // 网络和缓存都不能用  返回504
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    // 如果不用网络 cacheResponse肯定就不为null,那么就使用缓存
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }

    //networkRequest 不为null要进行网络请求了,所以 拦截器链 就继续 往下处理了
    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // 如果网络请求返回304 标识服务端资源没有修改 
    // 根据网络响应和缓存响应,更新缓存
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response
      } else {
        //如果不是304,说明服务端资源有更新 就关闭缓存body
        cacheResponse.body?.closeQuietly()
      }
    }

    //将网络数据和缓存传入
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      //网络响应可缓存 (CacheStrategy.isCacheable对响应处理,根据response的code和response.cacheControl.noStore)
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // 写入缓存
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)
      }

      //OkHttp默认只会对get请求进行缓存,因为get请求的数据一般是比较持久的,而post一般是交互操作,没太大意义进行缓存
	  //不是get请求就移除缓存
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

用到了使用缓存策略CacheStrategy来确定是否使用缓存。

可以理解为:

  1. 网络和缓存都不能用 ,返回504
  2. 网络networkResponse为null, cacheResponse肯定就不为null,那么就使用缓存
  3. networkResponse不为null,不管cacheResponse是否为null,直接去请求网络
  4. cacheResponse 不为null,如果网络请求返回304 标识服务端资源没有修改
  5. 如果不是304,说明服务端资源有更新,将网络数据和缓存传入
  6. 如果网络响应可缓存,返回cacheWritingResponse
  7. 最后一步,不是get请求就移除缓存
//根据request 候选Response 获取缓存策略
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()

再来看下CacheStrategy中处理了什么?

###CacheStrategy
//给定一个请求和缓存的响应,这将确定是使用网络、缓存还是两者兼而有之。选择缓存策略可能会向请求添加条件(如条件 GET 的“If-Modified-Since”标头)或向缓存响应添加警告(如果缓存数据可能过时)。
//初始化内部
  class Factory(
    private val nowMillis: Long,
    internal val request: Request,
    private val cacheResponse: Response?
  ) {
    //服务缓存响应的服务器时间
    private var servedDate: Date? = null
    private var servedDateString: String? = null

    //缓存响应的最后修改时间
    private var lastModified: Date? = null
    private var lastModifiedString: String? = null

    //缓存响应的到期时间
    private var expires: Date? = null

    //设置的指定  第一次发起缓存的Http请求时的时间戳
    private var sentRequestMillis = 0L

    //第一次收到缓存响应的时间戳
    private var receivedResponseMillis = 0L

    //缓存响应的Etag
    private var etag: String? = null

    //缓存响应的存活时间
    private var ageSeconds = -1

    /**
     * Returns true if computeFreshnessLifetime used a heuristic. If we used a heuristic to serve a
     * cached response older than 24 hours, we are required to attach a warning.
     */
    private fun isFreshnessLifetimeHeuristic(): Boolean {
      return cacheResponse!!.cacheControl.maxAgeSeconds == -1 && expires == null
    }

    init {
      if (cacheResponse != null) {
        //请求时间、响应时间、过期时长、修改时间、资源标记,都是从缓存响应中获取
        this.sentRequestMillis = cacheResponse.sentRequestAtMillis
        this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis
        val headers = cacheResponse.headers
        for (i in 0 until headers.size) {
          val fieldName = headers.name(i)
          val value = headers.value(i)
          when {
            fieldName.equals("Date", ignoreCase = true) -> {
              servedDate = value.toHttpDateOrNull()
              servedDateString = value
            }
            fieldName.equals("Expires", ignoreCase = true) -> {
              expires = value.toHttpDateOrNull()
            }
            fieldName.equals("Last-Modified", ignoreCase = true) -> {
              lastModified = value.toHttpDateOrNull()
              lastModifiedString = value
            }
            fieldName.equals("ETag", ignoreCase = true) -> {
              etag = value
            }
            fieldName.equals("Age"Retrofit2源码解读

Retrofit框架源码解读

带你一步步剖析Retrofit 源码解析:一款基于 OkHttp 实现的网络请求框架

BAT大牛 带你深度剖析Android 10大开源框架

Okhttp的源码解读

Okhttp的源码解读