OkHttp源码分析

Posted 我就是马云飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OkHttp源码分析相关的知识,希望对你有一定的参考价值。

本文基于OkHttp 4.9.0分析

OkHttp是什么?

众所周知,OkHttp是一个客户端用来发送HTTP消息并对服务器的响应做出处理的应用层框架。而且现在流行的Retrofit的底层同样也是基于Okhttp的。那么OkHttp有什么优点呢?我们来看下:

  • 无缝的支持GZIP减少数据流量
  • 缓存响应数据减少重复的网络请求
  • 请求失败自动重试主机的其他ip,自动重定向。
  • 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟。

使用方式

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
      .url(url)
      .build();
//同步
Response response = client.newCall(request).execute()//异步
Response response = client.newCall(request).enqueue()

我们可以看到同步和异步是调用Call的execute和enqueue方法,我们来看下具体的实现:

 override fun execute(): Response 
    check(executed.compareAndSet(false, true))  "Already Executed" 
    timeout.enter()
    callStart()
    try 
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
     finally 
      client.dispatcher.finished(this)
    
  

  override fun enqueue(responseCallback: Callback) 
    check(executed.compareAndSet(false, true))  "Already Executed" 
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  

可以看出,不管是同步还是异步,都会使用dispatcher,也就是任务分发器。

Dispatcher分发器

前面我们说到了不管同步还是异步,都会使用dispatcher,dispatcher内部会有一个线程池,也就是使用异步请求的时候我们会用到的。我们先来看下Dispatcher内部基本的成员变量:

 //异步请求的最大数量
 var maxRequests = 64
 //每个主机同时请求的最大数量
 var maxRequestsPerHost = 5
 //闲置任务
 var idleCallback: Runnable? = null
 //异步请求线程池
 private var executorServiceOrNull: ExecutorService? = null
 val executorService: ExecutorService
 //异步请求等待队列
 private val readyAsyncCalls = ArrayDeque<AsyncCall>()
 //异步请求执行队列
 private val runningAsyncCalls = ArrayDeque<AsyncCall>()
 //同步请求执行队列
 private val runningSyncCalls = ArrayDeque<RealCall>()

同步请求

 @Synchronized internal fun executed(call: RealCall) 
    runningSyncCalls.add(call)
  

因为是同步请求,所以无需做任何操作,只需要把执行的callback放入同步队列中即可。

异步请求

internal fun enqueue(call: AsyncCall) 
    synchronized(this) 
      readyAsyncCalls.add(call)
      if (!call.call.forWebSocket) 
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      
    
    promoteAndExecute()
  

可以看到执行异步请求的时候,我们先把请求放入等待队列中,然后调用promoteAndExecute。我们看下这段代码是用来干什么的?

private fun promoteAndExecute(): Boolean 
    this.assertThreadDoesntHoldLock()
    
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) 
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) 
        val asyncCall = i.next()
        
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
        
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      
      isRunning = runningCallsCount() > 0
    

    for (i in 0 until executableCalls.size) 
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    

    return isRunning
  

当正在执行的请求没有超过最大请求数64个时,并且同一个host的请求没有超过5个时,将它加入到执行队列。开始执行。

当请求执行完成后,还会调用分发器当finish方法,我们看下finish方法:

 /** Used by [AsyncCall.run] to signal completion. */
  internal fun finished(call: AsyncCall) 
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) 
    finished(runningSyncCalls, call)
  

  private fun <T> finished(calls: Deque<T>, call: T) 
    val idleCallback: Runnable?
    synchronized(this) 
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) 
      idleCallback.run()
    
  

不管同步还是异步,执行完成后,都需要从队列中移除,然后判断是否有正在执行的任务,如果没有的话,就执行闲置任务。

请求流程

前面梳理了分发器的同步和异步操作,但是真正的请求流程还是在RealCall中。我们来看下它的同步和异步方法:

 override fun execute(): Response 
    check(executed.compareAndSet(false, true))  "Already Executed" 

    timeout.enter()
    callStart()
    try 
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
     finally 
      client.dispatcher.finished(this)
    
  

可以看到同步请求直接return了getResponseWithInterceptorChain()方法。我们在看看异步的请求:

override fun enqueue(responseCallback: Callback) 
    check(executed.compareAndSet(false, true))  "Already Executed" 

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  

可以看到,在异步请求的时候丢给了AsyncCall去处理。而AsyncCall是一个Runnable。我们直接看看它的run方法。

 override fun run() 
      threadName("OkHttp $redactedUrl()") 
        var signalledCallback = false
        timeout.enter()
        try 
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
         catch (e: IOException) 
          if (signalledCallback) 
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for $toLoggableString()", Platform.INFO, e)
           else 
            responseCallback.onFailure(this@RealCall, e)
          
         catch (t: Throwable) 
          cancel()
          if (!signalledCallback) 
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          
          throw t
         finally 
          client.dispatcher.finished(this)
        
      
    

可以看到,这里也是getResponseWithInterceptorChain()去拿到它的response。而OkHttp中最核心的也就是这个方法,处理了各种拦截器的逻辑。

拦截器

无论同异步请求都会调用到getResponseWithInterceptorChain(),这个方法主要使用责任链模式将整个请求分为几个拦截器调用,简化了各自的责任和逻辑,而且还可以扩展一些自定义的拦截器。如果不清楚责任链模式,请先查看设计模式之责任链模式

如何进行拦截

前面说到了责任链模式,那我们看下RealCall中是如何进行责任链形式的调用,其实主要方法是在RealInterceptorChain中。我们看下这个方法:


  @Throws(IOException::class)
  override fun proceed(request: Request): Response 
    check(index < interceptors.size)

    calls++

    if (exchange != null) 
      check(exchange.finder.sameHostAndPort(request.url)) 
        "network interceptor $interceptors[index - 1] must retain the same host and port"
      
      check(calls == 1) 
        "network interceptor $interceptors[index - 1] must call proceed() exactly once"
      
    
    //注释1
    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) 
      check(index + 1 >= interceptors.size || next.calls == 1) 
        "network interceptor $interceptor must call proceed() exactly once"
      
    

    check(response.body != null)  "interceptor $interceptor returned a response with no body" 

    return response
  

可以看到注释1处,拿到下一级的拦截器,不断执行它的intercept方法,最后return response 然后一层一层向上反馈数据。

拦截器分析

现在我们来看下拦截器具体的逻辑,直接查看getResponseWithInterceptorChain()这个方法:

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response 
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    //自定义的拦截器
    interceptors += client.interceptors
    //处理重定向的后续请求和失败重试
    interceptors += RetryAndFollowUpInterceptor(client)
    //补全请求,处理网络桥接的
    interceptors += BridgeInterceptor(client.cookieJar)
    //处理缓存的
    interceptors += CacheInterceptor(client.cache)
    //处理tcp链接的
    interceptors += ConnectInterceptor
    //处理网络的
    if (!forWebSocket) 
      interceptors += client.networkInterceptors
    
    //处理服务器通信,并封装请求数据与解析响应数据
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try 
      val response = chain.proceed(originalRequest)
      if (isCanceled()) 
        response.closeQuietly()
        throw IOException("Canceled")
      
      return response
     catch (e: IOException) 
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
     finally 
      if (!calledNoMoreExchanges) 
        noMoreExchanges(null)
      
    
  
RetryAndFollowUpInterceptor

这个拦截器主要处理重试以及重定向的,一般情况下,第一次请求不会涉及到,我们先来看下重试的方法。

    try 
          response = realChain.proceed(request)
          newExchangeFinder = true
         catch (e: RouteException) 
          // 尝试通过路由链接失败,请求不会发送
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) 
            throw e.firstConnectException.withSuppressed(recoveredFailures)
           else 
            recoveredFailures += e.firstConnectException
          
          newExchangeFinder = false
          continue
         catch (e: IOException) 
          // 与服务器通信的时候发生了异常,请求可能已经发送了
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) 
            throw e.withSuppressed(recoveredFailures)
           else 
            recoveredFailures += e
          
          newExchangeFinder = false
          continue
        

可以看到两个异常都是根据recover方法判断是否能够进行重试,如果返回true,则表示允许重试。那么我们来看下recover方法:

private fun recover( e: IOException, call: RealCall, userRequest: Request,
    requestSendStarted: Boolean
  ): Boolean 
    // 本身设置了不允许重试
    if (!client.retryOnConnectionFailure) return false

    // 如果没法再次发送请求
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    // 如果不是重试的异常
    if (!isRecoverable(e, requestSendStarted)) return false

    // 没有其他路径进行链接
    if (!call.retryAfterFailure()) return false

    // For failure recovery, use the same route selector with a new connection.
    return true
  

如果请求结束没有出现异常,不代表当前的响应就是最终交互的,因为我们还需要判断是否需要重定向,而重定向的方法是followUpRequest。我们来看一下:

 @Throws(IOException::class)
  private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? 
    val route = exchange?.connection?.route()
    val responseCode = userResponse.code

    val method = userResponse.request.method
    when (responseCode) 
    //407:使用了代理服务器,让代理服务器授权
      HTTP_PROXY_AUTH -> 
        val selectedProxy = route!!.proxy
        if (selectedProxy.type() != Proxy.Type.HTTP) 
          throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
        
        return client.proxyAuthenticator.authenticate(route, userResponse)
      
    //401:未经授权,进行授权
      HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
    //300,301,302,303,307,308: 需要重定向,进行重定向操作。
      HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> 
        return buildRedirectRequest(userResponse, method)
      
    // 408 :请求超时
      HTTP_CLIENT_TIMEOUT -> 
      //如果客户端不允许,直接返回null
        if (!client.retryOnConnectionFailure) 
          return null
        
        //如果尝试了,还是失败,就不管了,返回null
        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) 
          return null
        
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) 
          return null
        
        //如果服务器告诉我们重试时间,我们也不管了,返回null
        if (retryAfter(userResponse, 0) > 0) 
          return null
        

        return userResponse.request
      
    //503:服务不可用,但是只在服务器告诉你 Retry-After:0(意思就是立即重试) 才重请求
      HTTP_UNAVAILABLE -> 
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) 
          return null
        

        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) 
          return userResponse.request
        

        return null
      
    //421:即使域名不同,Okhttp还是可以合并Http2链接,当返回421时,可以用其他链接进行重试。
      HTTP_MISDIRECTED_REQUEST -> 
        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) 
          return null
        

        if (exchange == null || !exchange

以上是关于OkHttp源码分析的主要内容,如果未能解决你的问题,请参考以下文章

okhttp源码分析

OkHttp 基本使用&源码分析

OkHttp 基本使用&源码分析

《OkHttp源码分析》之 OkHttp的缓存管理

OKHttp源码分析

OKHttp源码分析