OKHttp源码分析

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OKHttp源码分析相关的知识,希望对你有一定的参考价值。

OKHttp作为常用的网络框架,Okhttp是由square公司开发。掌握OKHttp的用法以及它的内部工作原理还是很有必要的。

一、引入方式

 

1.gradle引入

 

   implementation 'com.squareup.okhttp3:okhttp:3.14.7'
    implementation 'com.squareup.okio:okio:1.17.5'

2.权限

<uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />

 

二、使用方式

步骤:

  • 构建OkHttpClient对象。
  • 构建Request对象。
  • 生成Call对象。
  • Call发起同步、异步请求。

 

2.1 Get请求

1.异步请求

 

 val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

2.同步请求

val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
Thread() 
    try 
        val execute = newCall.execute()
        Log.v(OKHTTP_TAG, execute.toString())
     catch (e: Exception) 
        e.printStackTrace()
    
.start()

 

2.2 Post请求

下面的例子是:

1.post请求提交String。

 

val okHttpClient = OkHttpClient()
val contentType = MediaType.parse("text/x-markdown; charset=utf-8")
val content = "Hello"
val body = RequestBody.create(contentType, content)

val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/chapters/json")
    .post(body)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

 

 

2.post请求提交表单。

val okHttpClient = OkHttpClient()
val formBody = FormBody.Builder().add("username", "zy")
    .add("password", "123")
    .build()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .post(formBody)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

 

 

3.post请求提交复杂请求体

val okHttpClient = OkHttpClient()
//image file
val imageFile = File(Environment.getExternalStorageDirectory(), "test_ic.png")
//通过RequestBody构建fileBody
val fileBody = RequestBody.create(MediaType.parse("image/jpg"), imageFile)
//MultipartBody构建多种类型(用户名、密码、头像)
val multipartBody = MultipartBody.Builder()
    .setType(MultipartBody.FORM)
    .addFormDataPart("username", "zy")
    .addFormDataPart("phone", "123456")
    .addFormDataPart("Test", "test_ic.png", fileBody)
    .build()

val getRequest = Request.Builder()
    .url("https://wanandroid.com/wxarticle/chapters/json")
    .post(multipartBody)
    .build()

val newCall = okHttpClient.newCall(getRequest)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

 

 

2.3 请求配置项 

下面的例子中设置了请求的超时时长、缓存位置以及大小、监控OKHttp发出的请求(全局配置)

val okHttpClient = OkHttpClient.Builder()
    .connectTimeout(15, TimeUnit.SECONDS)
    .readTimeout(10, TimeUnit.SECONDS)
    .writeTimeout(10, TimeUnit.SECONDS)
    .cache(Cache(externalCacheDir, 500 * 1024 * 1024))
    .addInterceptor 
        val request = it.request()
        val url = request.url().toString()
        Log.v(
            OKHTTP_TAG,
            "intercept:proceed start: url" + url + ", at " + System.currentTimeMillis()
        )
        val response = it.proceed(request)
        val body = response.body()
        Log.v(
            OKHTTP_TAG,
            "intercept:proceed end: url" + url + ", at " + System.currentTimeMillis()
        )
        Log.v(OKHTTP_TAG, body?.toString() ?: "")
        response
    .build()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

请求单独的配置

下面的例子中:

使用addHeader()方法添加了请求头。

使用cacheControl(CacheControl.FORCE_NETWORK)设置此次请求是能使用网络,不用缓存。

val okHttpClient = OkHttpClient()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get()
    .addHeader("key", "value")
    .cacheControl(CacheControl.FORCE_CACHE)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object :Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

 

 

三、OKHTTP请求流程

通过一个简单的异步请求分析下OKHttp的请求流程。

val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback 
    override fun onFailure(call: Call, e: IOException) 
        e.printStackTrace()
    

    override fun onResponse(call: Call, response: Response) 
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    
)

 

1.请求的创建

1.1OKHttpClient创建

 

通过OkHttpClient构造函数构建实例,配置项就是Builder构造方法中默认值。

OKHttpClient.Builder的构造函数

public Builder() 
  dispatcher = new Dispatcher();//分发器,主要用来执行异步请求时的策略
  protocols = DEFAULT_PROTOCOLS;//http协议
  connectionSpecs = DEFAULT_CONNECTION_SPECS;//连接配置
  eventListenerFactory = EventListener.factory(EventListener.NONE);//请求监听工厂
  proxySelector = ProxySelector.getDefault();//代理选择器
  if (proxySelector == null) 
    proxySelector = new NullProxySelector();
  
  cookieJar = CookieJar.NO_COOKIES;//cookie
  socketFactory = SocketFactory.getDefault();//Socket工厂
  hostnameVerifier = OkHostnameVerifier.INSTANCE;//主机name验证
  certificatePinner = CertificatePinner.DEFAULT;//证书链
  proxyAuthenticator = Authenticator.NONE;//代理服务器身份验证
  authenticator = Authenticator.NONE;//源服务器身份验证
  connectionPool = new ConnectionPool();//连接池
  dns = Dns.SYSTEM;//dns域名
  followSslRedirects = true;//是否遵循 ssl 重定向
  followRedirects = true;//是否遵循重定向
  retryOnConnectionFailure = true;//连接失败的时候是否重试
  callTimeout = 0;//请求超时
  connectTimeout = 10_000;//连接超时
  readTimeout = 10_000;//读超时
  writeTimeout = 10_000;//写超时
  pingInterval = 0;//ping间隔时间

 

1.2Request实例创建

Request也同样是使用Builder模式创建

 

 

 

 

 

 

// 请求链接
  HttpUrl url;
  // 请求方法
  String method;
  // 请求头
  Headers.Builder headers;
  // 请求体
  RequestBody body;
    // 标签
  Map<Class<?>, Object> tags = Collections.emptyMap();

 

 

1.3 Call对象的创建

 

 

 

public interface Call extends Cloneable 
 
  //请求
  Request request();
  //同步请求
  Response execute() throws IOException;
  //异步请求
  void enqueue(Callback responseCallback);
  //取消请求
  void cancel();

 //是否在请求过程中
  boolean isExecuted();
 //请求是否取消
  boolean isCanceled();


  Timeout timeout();

  
  Call clone();
  //工厂接口
  interface Factory 
    Call newCall(Request request);
  

 

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) 
  //OKHttpClient 实例
  this.client = client;
  //最初的Request
  this.originalRequest = originalRequest;
  //是否支持websocket通信
  this.forWebSocket = forWebSocket;

 

Transmitter内部持有OkHttpClient、连接池、call、事件监听器。

public Transmitter(OkHttpClient client, Call call) 
  this.client = client; //OKHttpClient
  //连接池
  this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
  //Call实例
  this.call = call;
  //事件监听
  this.eventListener = client.eventListenerFactory().create(call);
  this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);

 

 

 

2.请求调度

2.1异步请求

 

 

@Override public void enqueue(Callback responseCallback) 
  synchronized (this) 
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  
  transmitter.callStart();//回调 请求监听器的开始
  client.dispatcher().enqueue(new AsyncCall(responseCallback));//开始调度

 

首先判断是否已请求过,回调请求开始。然后调用Dispatcher的enqueue方法,传入参数AsyncCall。AsynCall本质是一个Runnable。

final class AsyncCall extends NamedRunnable 
    ... ...
public abstract class NamedRunnable implements Runnable 
  protected final String name;

  public NamedRunnable(String format, Object... args) 
    this.name = Util.format(format, args);
  

  @Override public final void run() 
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try 
      execute();
     finally 
      Thread.currentThread().setName(oldName);
    
  

  protected abstract void execute();

 

 

#Dispatcher

请求放入双端队列readyAsyncCalls中,再从 正在执行的请求runningAsyncCalls 或 等待执行的请求readyAsyncCalls 中找到是相同host的请求,把callsPerHost重用给当前请求。callsPerHost看名字感觉像是 拥有相同host的请求的数量,类型是AtomicInteger。

void enqueue(AsyncCall call) 
  synchronized (this) 
      //将AsyncCall加入等待执行的队列
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) 
        //从 runningAsyncCalls或者readyAsyncCalls中找到相同host的请求
        //相同的 host,使用相同的host calls计数器
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    
  
  promoteAndExecute();

 

//从 runningAsyncCalls或者readyAsyncCalls中找到相同host的请求
@Nullable private AsyncCall findExistingCallWithHost(String host) 
  for (AsyncCall existingCall : runningAsyncCalls) 
    if (existingCall.host().equals(host)) return existingCall;
  
  for (AsyncCall existingCall : readyAsyncCalls) 
    if (existingCall.host().equals(host)) return existingCall;
  
  return null;

 

#Dispatcher
private int maxRequests = 64; //最多同时请求数为64
private int maxRequestsPerHost = 5;//每个主机最多同时请求数为5

//注意使用的是双端队列

/** 待运行的异步任务队列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** 正在运行的异步任务队列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** 正在运行的同步任务队列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
private boolean promoteAndExecute() 
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) 
    //遍历等待执行的异步任务队列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) 
        //得到一个待执行的AsyncCall
      AsyncCall asyncCall = i.next();
        //当最大并发数小于64以及当前请求的主机的请求数小于5时,添加到处理中的异步请求队列
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
        
      i.remove();
      //该host下的calls计数器加1
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    
    isRunning = runningCallsCount() > 0;
  

  for (int i = 0, size = executableCalls.size(); i < size; i++) 
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());//执行请求
  

  return isRunning;

 

遍历readyAsyncCalls,判断如果runningAsyncCalls数量大于最大并发请求数64就break,或者 相同host请求的数量大于5,就continue。如果不满足上面的两种情况,就从等待队列readyAsyncCalls中移除,callsPerHost自增1,放入 集合executableCalls中,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。readyAsyncCalls的价值在于控制最大并发数的缓冲:异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。

 

#RealCall

void executeOn(ExecutorService executorService) 
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try 
      //通过传入的线程池执行AsyncCall
    executorService.execute(this);
    success = true;
   catch (RejectedExecutionException e) 
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);//回调失败
   finally 
    if (!success) 
      client.dispatcher().finished(this); // This call is no longer running!
    
  

 

#Dispatcher

该线程池与CachedThreadPool的线程池 执行请求RealCall非常相似。

public synchronized ExecutorService executorService() 
  if (executorService == null) 
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  
  return executorService;

 

#AsyncCall 

@Override protected void execute() 
  boolean signalledCallback = false;
  //超时计时
  transmitter.timeoutEnter();
  try 
      //获取请求Response
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    //回调请求结果
    responseCallback.onResponse(RealCall.this, response);
   catch (IOException e) 
    if (signalledCallback) 
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
     else 
        //走失败回调
      responseCallback.onFailure(RealCall.this, e);
    
   catch (Throwable t) 
    cancel();
    if (!signalledCallback) 
      IOException canceledException = new IOException("canceled due to " + t);
      canceledException.addSuppressed(t);
      // 走失败回调
      responseCallback.onFailure(RealCall.this, canceledException);
    
    throw t;
   finally 
      //结束请求
    client.dispatcher().finished(this);
  

 调用getResponseWithInterceptorChain()方法来获取Response,使用responseCallback回调结果,最后请求结束调用了dispatcher的finish方法。

 

3.请求的执行

执行请求最终都会执行到RealCall的getResponseWithInterceptorChain()方法中

Response getResponseWithInterceptorChain() throws IOException 
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  //应用拦截器(用户自定义拦截器)
  interceptors.addAll(client.interceptors());
  //重试及重定向拦截器
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  //桥接拦截器
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  //缓存拦截器
  interceptors.add(new CacheInterceptor(client.internalCache()));
  //连接拦截器
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) 
      //网络拦截器(用户自定义)
    interceptors.addAll(client.networkInterceptors());
  
  //请求服务拦截器
  interceptors.add(new CallServerInterceptor(forWebSocket));
  //拦截器链
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  boolean calledNoMoreExchanges = false;
  try 
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) 
      closeQuietly(response);
      throw new IOException("Canceled");
    
    return response;
   catch (IOException e) 
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
   finally 
    if (!calledNoMoreExchanges) 
      transmitter.noMoreExchanges(null);
    
  
package okhttp3;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

public interface Interceptor 
  Response intercept(Chain chain) throws IOException;

  interface Chain 
    Request request();

    Response proceed(Request request) throws IOException;

 
    @Nullable Connection connection();

    Call call();

    int connectTimeoutMillis();

    Chain withConnectTimeout(int timeout, TimeUnit unit);

    int readTimeoutMillis();

    Chain withReadTimeout(int timeout, TimeUnit unit);

    int writeTimeoutMillis();

    Chain withWriteTimeout(int timeout, TimeUnit unit);
  

 

 

 

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
    originalRequest, this, client.connectTimeoutMillis(),
    client.readTimeoutMillis(), client.writeTimeoutMillis());

 

 

在实例化RealInterceptorChain时 index赋值是0,exchange是null,首先获取了第一个拦截器,调用了它的interceptor方法,并返回和校验了结果。

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
    index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

调用应用拦截器的interceptor方法传入的参数:拦截器链实例next,next就是把index + 1。在应用拦截器中调用的是 next的proceed方法。

最后一个拦截器CallServerInterceptor之外,所有拦截器的interceptor方法都调用了 传入RealInterceptorChain实例。每个拦截器interceptor方法在 调用chain的proceed方法拿到下一个拦截器处理的response,然后返回给上一个拦截器。

 

4.拦截器

所有的拦截器会组合成一个拦截器链,以典型的责任链模式,依次执行每个拦截器。不同的拦截器有不同的职责,链上的拦截器会按顺序挨个处理,在Request发出之前,Response返回之前,插入一些定制逻辑,这样可以方便的扩展需求。

 

#RealCall

Response getResponseWithInterceptorChain() throws IOException 
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  interceptors.addAll(client.interceptors());
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) 
    interceptors.addAll(client.networkInterceptors());
  
  interceptors.add(new CallServerInterceptor(forWebSocket));

  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  boolean calledNoMoreExchanges = false;
  try 
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) 
      closeQuietly(response);
      throw new IOException("Canceled");
    
    return response;
   catch (IOException e) 
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
   finally 
    if (!calledNoMoreExchanges) 
      transmitter.noMoreExchanges(null);
    
  

 

 

 

 

 

1.RetryAndFollowUpInterceptor

 

 

 

@Override public Response intercept(Chain chain) throws IOException 
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Transmitter transmitter = realChain.transmitter();

  int followUpCount = 0;
  Response priorResponse = null;
  while (true) 
      //准备连接
    transmitter.prepareToConnect(request);

    if (transmitter.isCanceled()) 
      throw new IOException("Canceled");
    

    Response response;
    boolean success = false;
    try 
        //继续执行下一个Interceptor
      response = realChain.proceed(request, transmitter, null);
      success = true;
     catch (RouteException e) 
      // The attempt to connect via a route failed. The request will not have been sent.
      //连接路由异常,此时请求还未发送。
      if (!recover(e.getLastConnectException(), transmitter, false, request)) 
        throw e.getFirstConnectException();
      
      continue;
     catch (IOException e) 
      // An attempt to communicate with a server failed. The request may have been sent.
      boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
      //IO异常,请求可能已经发出。
      if (!recover(e, transmitter, requestSendStarted, request)) throw e;
      continue;
     finally 
      // The network call threw an exception. Release any resources.
      if (!success) 
          //请求没成功,释放资源。
        transmitter.exchangeDoneDueToException();
      
    

    // Attach the prior response if it exists. Such responses never have a body.
    // 关联上一个response
    if (priorResponse != null) 
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
          .build();
    

    Exchange exchange = Internal.instance.exchange(response);
    Route route = exchange != null ? exchange.connection().route() : null;
    //跟进结果,主要作用是根据响应码处理请求,返回Request不为空时进行重定向处理-拿到重定向的request。
     //获取响应码判断是否需要重定向
    Request followUp = followUpRequest(response, route);
    //followUp为空,不需要重试,直接返回
    if (followUp == null) 
      if (exchange != null && exchange.isDuplex()) 
        transmitter.timeoutEarlyExit();
      
      return response;
    

    RequestBody followUpBody = followUp.body();
    if (followUpBody != null && followUpBody.isOneShot()) 
      return response;
    

    closeQuietly(response.body());
    if (transmitter.hasExchange()) 
      exchange.detachWithViolence();
    
    //最多重试20次
    if (++followUpCount > MAX_FOLLOW_UPS) 
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    

    request = followUp;
    priorResponse = response;
  

 

2.BridgeInterceptor

@Override public Response intercept(Chain chain) throws IOException 
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder();

  RequestBody body = userRequest.body();
  if (body != null) 
    MediaType contentType = body.contentType();
    if (contentType != null) 
      requestBuilder.header("Content-Type", contentType.toString());
    

    long contentLength = body.contentLength();
    //获取RequestBody的长度,添加Header "Content-Length"
    if (contentLength != -1) 
      requestBuilder.header("Content-Length", Long.toString(contentLength));
      requestBuilder.removeHeader("Transfer-Encoding");
     else 
        //否则添加"Transfer-Encoding"
      requestBuilder.header("Transfer-Encoding", "chunked");
      requestBuilder.removeHeader("Content-Length");
    
  

  if (userRequest.header("Host") == null) 
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
  

  if (userRequest.header("Connection") == null) 
    requestBuilder.header("Connection", "Keep-Alive");
  

  // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
  // the transfer stream.
  //"Accept-Encoding: gzip",接收 返回gzip编码压缩的数据
  //如果手动添加了"Accept-Encoding: gzip",则不会进入下面的if语句,transparentGzip为false,需要我们自己处理数据解压。
  //如果 没有 手动添加“Accept-Encoding: gzip”,transparentGzip是true,就会自动添加,后续也会自动解压处理。
  boolean transparentGzip = false;
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) 
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
  
    //从cookiejar中获取cookie,添加到Header。
  List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
  if (!cookies.isEmpty()) 
    requestBuilder.header("Cookie", cookieHeader(cookies));
  
    //"User-Agent"需要作为公共Header外部统一添加
  if (userRequest.header("User-Agent") == null) 
    requestBuilder.header("User-Agent", Version.userAgent());
  

    //交给下一个拦截器去处理请求
  Response networkResponse = chain.proceed(requestBuilder.build());
   //从networkResponse中获取header "Set-Cookie" 存入 cookieJar
  HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);
  //如果没有手动添加“Accept-Encoding: gzip”,会自动创建 能自动解压的responseBody-GzipSource
  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) 
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
  
  //构建新的 Response 返回出去
  return responseBuilder.build();

 

BridgeInterceptor拦截器,对请求添加了header:“Content-Type”、“Content-Length” 或 “Transfer-Encoding”、“Host”、“Connection”、“Accept-Encoding”、“Cookie”、“User-Agent”,即网络层真正可执行的请求。其中,默认是没有cookie处理的,需要我们在初始化OkhttpClient时配置我们自己的cookieJar。

获取响应后,先把响应header中的cookie存入cookieJar,然后如果没有手动添加请求header “Accept-Encoding: gzip”,那么会通过 创建能自动解压的responseBody——GzipSource,接着构建新的response返回。

 

3.CacheInterceptor

 

CacheInterceptor,缓存拦截器,提供网络请求缓存的存取。通过CacheInterceptor合理使用本地缓存,可以有效地减少网络开销、减少响应延迟。

 

1.了解下http缓存机制:

第一次请求:

 

第二次请求:

 

 

 

 

@Override public Response intercept(Chain chain) throws IOException 
    //用request的url 从缓存中 获取响应 作为候选(CacheStrategy决定是否使用)
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();
    //根据 request 、 候选response 获取缓存策略
    //缓存策略 将决定是否 使用缓存:strategy.networkRequest为 null,不使用网络;strategy.cacheResponse为null,不使用缓存。
   //缓存策略 有两种类型
   //networkRequest 网络请求
   //cacheResponse  缓存的响应
   CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;
  
  //根据缓存策略更新统计指标:请求次数、网络请求次数、使用缓存次数
  if (cache != null) 
      //计算请求次数 和 缓存次数
    cache.trackResponse(strategy);
  
  //有缓存 但不能用,关闭
  if (cacheCandidate != null && cacheResponse == null) 
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
  

  // If we're forbidden from using the network and the cache is insufficient, fail.
   //网络请求、缓存 都不能用,就返回 504 
   //如果 禁止使用网络 并且 缓存不足,返回504和空body的Response
   if (networkRequest == null && cacheResponse == null) 
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  

  // If we don't need the network, we're done.
  //如果不用网络, cacheResponse肯定不为空了,那么即使用缓存,结束了,不会走后面的拦截器了。
  //如果策略中不能使用网络,就把缓存的response封装返回
   if (networkRequest == null) 
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  
 //到这里,networkRequest != null(cacheResponse可能为null, 也可能不为null)
 //networkRequest != null,就是要进行网络请求了,所以拦截器 就继续往下处理了。
  Response networkResponse = null;
  try 
      //调用拦截器链的process从网络获取数据
    networkResponse = chain.proceed(networkRequest);
   finally 
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) 
      closeQuietly(cacheCandidate.body());
    
  

  // If we have a cache response too, then we're doing a conditional get.
  //如果网络请求返回304,表示服务端资源没有修改,那么就结合 网络响应和缓存响应,然后更新缓存,返回,结束。
  //如果有缓存的Response
  if (cacheResponse != null) 
      //如果网络请求返回 code为304,即说明资源未修改
    if (networkResponse.code() == HTTP_NOT_MODIFIED) 
        //直接封装缓存的Response返回即可
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))//结合header
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())//请求事件
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())//接受事件
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      // Update the cache after combining headers but before stripping the
      // Content-Encoding header (as performed by initContentStream()).
      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
     else 
        //如果是非304,说明服务端资源有更新,就关闭缓存body
      closeQuietly(cacheResponse.body());
    
  

  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) 
      //网络响应可缓存(请求和响应 的 头 Cache-Control都不是 “no-store”)
    //判断是否具有主体 并且 是否可以缓存供后续使用
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) 
      // Offer this request to the cache.
      // 写入缓存
      //加入缓存
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    
    //OkHttp默认只会对 get请求进行缓存,因为get请求的数据一般比较持久
    //而post一般是交互操作,没太大意义进行缓存
    //不是get请求就移除缓存
    //如果请求方法无效,就从缓存中remove掉
    if (HttpMethod.invalidatesCache(networkRequest.method())) 
      try 
        cache.remove(networkRequest);
       catch (IOException ignored) 
        // The cache cannot be written.
      
    
  

  return response;

我们通常会使用OkHttp 中使用网络缓存,提高访问效率。

  • 有网络的时候:短时间内频繁的请求,后面的请求使用缓存中的资源。
  • 无网络的时候:获取之前缓存的数据进行暂时的页面显示,当网络更新时对当前activity的数据进行刷新,刷新界面,避免界面空白的场景。

 

 

3.ConnectInterceptor

这个拦截器是关于TCP 连接的。连接拦截器(ConnectInterceptor)就是找到一个可用连接,也就是TCP连接,这个连接就是用于HTTP请求和响应的。

  • 打开指定服务器的网络连接
  • 交给下一个拦截器CallServerInterceptor来处理请求和获取数据的逻辑

 

 

 

@Override public Response intercept(Chain chain) throws IOException 
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  Transmitter transmitter = realChain.transmitter();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  // 如果请求是GET格式,需要一些额外的检查
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

  return realChain.proceed(request, transmitter, exchange);
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) 
  synchronized (connectionPool) 
    if (noMoreExchanges) 
      throw new IllegalStateException("released");
    
    if (exchange != null) 
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    
  
  // 寻找ExchangeCodec对象
  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  // 通过找到的codec对象构建Exchange对象
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
  // 进行一些变量的赋值
  synchronized (connectionPool) 
    this.exchange = result;
    this.exchangeRequestDone = false;
    this.exchangeResponseDone = false;
    return result;
  

首先通过 exchangeFinder.find 方法进行了对 ExchangeCodec 的查找,找到对应的 ExchangeCodec 对象,之后通过这个 codec 对象构建了一个 Exchange 对象并返回。

  • ExchangeCodec:是一个连接所用的编码解码器,用于编码HTTP请求和解码HTTP响应。
  • Exchange:封装这个编码解码器的一个工具类,用于管理ExchangeCodec,处理实际的 I/O。

 

 

public ExchangeCodec find(
    OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) 
  int connectTimeout = chain.connectTimeoutMillis();
  int readTimeout = chain.readTimeoutMillis();
  int writeTimeout = chain.writeTimeoutMillis();
  int pingIntervalMillis = client.pingIntervalMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try 
      // findHealthyConnection 方法从而获取 RealConnection 对象
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
        //调用了 RealConnection.newCodec 方法获取 ExchangeCodec 对象
    return resultConnection.newCodec(client, chain);
   catch (RouteException e) 
    trackFailure();
    throw e;
   catch (IOException e) 
    trackFailure();
    throw new RouteException(e);
  

寻找可用连接

我们先看到 findHealthyConnection 方法:

 

 

/**
 * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
 * until a healthy connection is found.
 */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
    boolean doExtensiveHealthChecks) throws IOException 
  while (true) 
      //可以看到这里是一个循环,不断地在调用 findConnection 方法寻找连接,
      //若找不到 Healthy(可用)的连接,则继续循环直到找到为止。
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        pingIntervalMillis, connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) 
      if (candidate.successCount == 0 && !candidate.isMultiplexed()) 
        return candidate;
      
    

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) 
      candidate.noNewExchanges();
      continue;
    

    return candidate;
  

 

寻找连接的方法:findConnection 方法。

 

/**
 * Returns a connection to host a new stream. This prefers the existing connection if it exists,
 * then the pool, finally building a new connection.
 */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException 
  boolean foundPooledConnection = false;
  RealConnection result = null;
  Route selectedRoute = null;
  RealConnection releasedConnection;
  Socket toClose;
  synchronized (connectionPool) 
    if (transmitter.isCanceled()) throw new IOException("Canceled");
    hasStreamFailure = false; // This is a fresh attempt.

    // Attempt to use an already-allocated connection. We need to be careful here because our
    // already-allocated connection may have been restricted from creating new exchanges.
    // 1、复用当前连接,检查这个连接是否可用和可复用
       releasedConnection = transmitter.connection;
        //如果连接不能创建Stream,则释放资源,返回待关闭的close socket
    toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
        ? transmitter.releaseConnectionNoEvents()
        : null;
     //证明连接可用
    if (transmitter.connection != null) 
      // We had an already-allocated connection and it's good.
      //存在可使用的已分配连接,复用当前连接
      result = transmitter.connection;
      releasedConnection = null;
    
     //没有可以使用的连接,就去连接池中寻找
    if (result == null) 
      // Attempt to get a connection from the pool.
      //2、从连接池中获取可用连接
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) 
        foundPooledConnection = true;
        result = transmitter.connection;
       else if (nextRouteToTry != null) 
        selectedRoute = nextRouteToTry;
        nextRouteToTry = null;
       else if (retryCurrentRoute()) 
        selectedRoute = transmitter.connection.route();
      
    
  
  closeQuietly(toClose);
  //回调
  if (releasedConnection != null) 
    eventListener.connectionReleased(call, releasedConnection);
  
  if (foundPooledConnection) 
    eventListener.connectionAcquired(call, result);
  
  if (result != null) 
    // If we found an already-allocated or pooled connection, we're done.
     //找到了一个已分配或者连接池中的连接,过程结束,返回该连接
    return result;
  

  // If we need a route selection, make one. This is a blocking operation.
  //否则,我们需要一个路由信息,这是个阻塞操作
  boolean newRouteSelection = false;
  if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) 
    newRouteSelection = true;
    routeSelection = routeSelector.next();
  

  List<Route> routes = null;
  synchronized (connectionPool) 
    if (transmitter.isCanceled()) throw new IOException("Canceled");

    if (newRouteSelection) 
      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. This could match due to connection coalescing.
      routes = routeSelection.getAll();
      //3、从连接池中获取可用连接(通过一组路由routes),通过更加全面的路由信息,再次从连接池中获取连接
      if (connectionPool.transmitterAcquirePooledConnection(
          address, transmitter, routes, false)) 
        foundPooledConnection = true;
        result = transmitter.connection;
      
    
     //如果还是没找到,则生成新的连接
    if (!foundPooledConnection) 
      if (selectedRoute == null) 
        selectedRoute = routeSelection.next();
      

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
     // 4、创建新连接
       result = new RealConnection(connectionPool, selectedRoute);
      connectingConnection = result;
    
  

  // If we found a pooled connection on the 2nd time around, we're done.
  //如果连接是从连接池中找到,则说明是可复用的。不是新生成的
    //如果新生成的连接则需要连接服务器才能使用
  if (foundPooledConnection) 
    eventListener.connectionAcquired(call, result);
    return result;
  

  // Do TCP + TLS handshakes. This is a blocking operation.
  //走到这说明是新生成的连接
    //tcp和tls握手,阻塞操作,连接server
  result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
      connectionRetryEnabled, call, eventListener);
      //将路由信息添加到routeDatabase的白名单中,证明该路由是可以连接到指定服务器的
  connectionPool.routeDatabase.connected(result.route());

//连接合并的最后一次尝试,只有我们尝试多次时才会发生
      //同一主机的并发连接
  Socket socket = null;
  synchronized (connectionPool) 
    connectingConnection = null;
    // Last attempt at connection coalescing, which only occurs if we attempted multiple
    // concurrent connections to the same host.
    // // 5、再获取一次连接,防止在新建连接过程中有其他竞争连接被创建了
    if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) 
      // We lost the race! Close the connection we created and return the pooled connection.
      //关闭创建的连接并返回连接池中的连接
      result.noNewExchanges = true;
      socket = result.socket();
      result = transmitter.connection;

      // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
      // that case we will retry the route we just successfully connected with.
      //  有可能获得一个不健康的连接,如果是这种情况,将重试刚刚成功连接的路由
      nextRouteToTry = selectedRoute;
     else 
        6、使用创建的新连接,放入连接池,并返回
      connectionPool.put(result);
      transmitter.acquireConnectionNoEvents(result);
    
  
  closeQuietly(socket);

  eventListener.connectionAcquired(call, result);
  return result;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

可用连接的优先级为:当前连接>连接池中的连接>新的连接。

  • 如果当前连接可用,则优先选择当前连接
  • 如果当前连接不可用,则从连接池中获取连接
  • 如果连接池获取失败,则创建一个新的连接,并进行TCP和TSL握手,然后放到连接池中

连接复用可以省去TCP和TLS握手的过程,从而提高网络访问的效率。

 

获取连接的过程很复杂,大体流程如下:

1、检查当前连接是否可用。通过noNewExchanges判断这个连接是否可用。如果不可用,则会transmitter.connection置为空。当请求失败需要重试的时候或者重定向的时候,这时候连接还在,是可以直接进行复用的。

2、从连接池中获取可用连接。

下面两句代码的区别:

 

 

connectionPool.callAcquirePooledConnection(address, call, null, false)
connectionPool.callAcquirePooledConnection(address, call, routes, false)

 

1.多了一个routes字段。

涉及到HTTP/2的一个技术,叫做 HTTP/2 CONNECTION COALESCING(连接合并)。假设有两个域名,可以解析为相同的IP地址,并且是可以用相同的TLS证书(比如通配符证书),那么客户端可以重用相同的TCP连接从这两个域名中获取资源。

这个routes就是当前域名(主机名)可以被解析的ip地址集合,这两个方法的区别也就是一个传了路由地址,一个没有传。

 

继续看callAcquirePooledConnection代码:

 

 

boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
    @Nullable List<Route> routes, boolean requireMultiplexed) 
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) 
    if (requireMultiplexed && !connection.isMultiplexed()) continue;
    if (!connection.isEligible(address, routes)) continue;
    transmitter.acquireConnectionNoEvents(connection);
    return true;
  
  return false;

首先传入的requireMultiplexed为false ,isEligible(Address address, @Nullable List<Route> routes) ,判断主机名、端口号等,如果请求完全相同就直接返回这个连接。 如果主机名不同,还可以判断是不是HTTP/2请求,如果是就继续判断路由地址,证书,如果都能匹配上,那么这个连接也是可用的。

 

3、创建新连接。

如果没有从连接池中获取到新连接,那么就创建一个新连接,其实就是调用到socket.connect进行TCP连接。

 

4、再从连接池获取一次连接,防止在新建连接过程中有其他竞争连接被创建了。

因为在创建过程中,有可能有其他的请求和你一起创建了新连接,所以我们需要再去取一次连接,如果有可以用的,就直接用它,防止资源浪费。

其实这里又涉及到HTTP2的一个知识点:多路复用。即不需要当前连接的上一个请求结束之后再去进行下一次请求,只要有连接就可以直接用。HTTP/2中可以保证在同一个域名只建立一路连接,并且可以并发进行请求。

5、新连接放入连接池,并返回。

 

 连接复用池:RealConnectionPool

1. 引用计数

在OkHttp中使用Transmitter类来计数。

计数加一和计数减一的操作其实是在改变List<Reference>列表的大小,List<Reference>的维护类是RealConnection,RealConnection是Socket物理连接的包装。List中的Transmitter弱引用数量就是socket被引用的计数,当计数为0时表示此连接是空闲的。

/** Current calls carried by this connection. */
final List<Reference<Transmitter>> transmitters = new ArrayList<>();

 

#Transmitter
void acquireConnectionNoEvents(RealConnection connection) 
  assert (Thread.holdsLock(connectionPool));

  if (this.connection != null) throw new IllegalStateException();
  this.connection = connection;
  //引用计数加1
  connection.transmitters.add(new TransmitterReference(this, callStackTrace));


/**
 * Remove the transmitter from the connection's list of allocations. Returns a socket that the
 * caller should close.
 */
@Nullable Socket releaseConnectionNoEvents() 
  assert (Thread.holdsLock(connectionPool));

  int index = -1;
  for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) 
    Reference<Transmitter> reference = this.connection.transmitters.get(i);
    if (reference.get() == this) 
      index = i;
      break;
    
  

  if (index == -1) throw new IllegalStateException();

  RealConnection released = this.connection;
  //引用计数减一
  released.transmitters.remove(index);
  this.connection = null;

  if (released.transmitters.isEmpty()) 
    released.idleAtNanos = System.nanoTime();
    if (connectionPool.connectionBecameIdle(released)) 
      return released.socket();
    
  

  return null;

 

  • 2.RealConnectionPool
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
    Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
    new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
//空闲的socket最大连接数	  
private final int maxIdleConnections;
 //socket的keepAlive时间
private final long keepAliveDurationNs;
//双向队列,里面维护了RealConnection也就是socket物理连接的包装
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  //记录连接失败的route的黑名单,当连接失败的时候就会把失败的线路加进去
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning; //是否正在清理
public final class ConnectionPool 
  final RealConnectionPool delegate;
  
  public ConnectionPool() 
  this(5, 5, TimeUnit.MINUTES);


public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) 
  this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);

    ... ...

这里使用了代理模式,通过ConnectionPool类我们可以发现默认空闲的socket最大连接数为5,socket的保活时间为5分钟,并且在构造ConnectionPool对象时实际构造的是RealConnectionPool对象。

 

 

  • 3. 缓存操作

3.1 放入连接

void put(RealConnection connection) 
  assert (Thread.holdsLock(this));
  if (!cleanupRunning) 
    cleanupRunning = true;
      //使用线程池执行清理任务
    executor.execute(cleanupRunnable);
  
  //将连接添加到双端队列中
  connections.add(connection);

放入连接的工作有两个:

  • 如果当前连接池没有在清理连接,则先使用线程池执行清理任务并且将正在清理的标志位设置为true
  • 将当前连接添加到双端队列中

 

3.2 清理连接

在放入连接时我们会执行清理连接的操作,会调用线程池执行cleanupRunnable的任务,让我们先看看这个任务

//线程不断调用cleanup来进行清理,并返回下次需要清理的间隔时间
private final Runnable cleanupRunnable = () -> 
  while (true) 
      //清理连接,并返回下次需要清理的间隔时间
    long waitNanos = cleanup(System.nanoTime());
    if (waitNanos == -1) return;
    if (waitNanos > 0) 
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      synchronized (RealConnectionPool.this) 
        try 
          RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
         catch (InterruptedException ignored) 
        
      
    
  
;

可以发现一旦清理任务开始执行后,就会每隔指定的间隔时间进行清理连接。

 

RealConnectionPool#cleanup

long cleanup(long now) 
 //正在使用的连接数量
  int inUseConnectionCount = 0;
  //空闲的连接数量
  int idleConnectionCount = 0;
  //长时间空闲的连接
  RealConnection longestIdleConnection = null;
  //最长空闲时间
  long longestIdleDurationNs = Long.MIN_VALUE;

  // Find either a connection to evict, or the time that the next eviction is due.
  synchronized (this) 
      // 遍历连接
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) 
      RealConnection connection = i.next();

     //查询此连接的transmitter的引用数量
        //如果引用数量大于0,则使用数量inUseConnectionCount加1
        //否则闲置数量idleConnectionCount加1
      // If the connection is in use, keep searching.
      if (pruneAndGetAllocationCount(connection, now) > 0) 
        inUseConnectionCount++;
        continue;
      

      idleConnectionCount++;

      // If the connection is ready to be evicted, we're done.
      //寻找空闲最久的那个连接
      //得到空闲时间
      long idleDurationNs = now - connection.idleAtNanos;
      if (idleDurationNs > longestIdleDurationNs) 
          //最长空闲时间
        longestIdleDurationNs = idleDurationNs;
        //最长空闲连接
        longestIdleConnection = connection;
      
    
  //如果空闲连接的空闲时间超过5分钟,或者空闲连接数超过5个,则移除空闲最久的连接
    if (longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections) 
      // We've found a connection to evict. Remove it from the list, then close it below (outside
      // of the synchronized block).
      connections.remove(

以上是关于OKHttp源码分析的主要内容,如果未能解决你的问题,请参考以下文章

okhttp源码分析

OkHttp 基本使用&源码分析

OkHttp 基本使用&源码分析

《OkHttp源码分析》之 OkHttp的缓存管理

OKHttp源码分析

OKHttp源码分析