三、深入理解OkHttp:连接处理-ConnectIntercepter

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三、深入理解OkHttp:连接处理-ConnectIntercepter相关的知识,希望对你有一定的参考价值。

参考技术A

终于来到OkHttp的网络连接模块,这块内容是OkHttp的核心内容。我们知道Http的连接需要进行3此握手,断开需要4次挥手。而连接的每一次握手,都需要进行Socket连接、释放,这是一个非常麻烦而且耗时耗力的过程。那么连接的服用就显得尤为重要了,同个地址的连接,如果在用完后不断开,保持连接,在下次的请求中便能重复使用这个连接,节省了连接的时间。 这对于大部分时间需要重复频繁访问同一个服务器地址的移动端网络来说更加不可或缺。

在本篇文章中,我们将以ConnectIntercepter为起点,跟随网络连接获取的过程,深入探究其中涉及到的:连接查找、连接复用,网络连接的建立(三次握手、Http2协议等的处理)。面对这复杂的过程,我们先总体的走一遍连接获取过程,然后在后续介绍 RealConnection.java ConnectionPool.java 来更深入的理解连接的建立和缓存查找等逻辑。除此之外,我们还需要先看一下另一个类: Transmitter.java ,它将在connect的过程中起到重要的地位。

总结:Transmitter是在创建RealCall的时候被创建的,其中需要了OkHttpClient和当前请求Call作为参数。所以我们知道了,一个请求对应着一个Transmitter。而且,它的成员变量里有ExchangeFinder等类,负责为这个请求查找到一个合适的请求。

这个方法是释放一个连接,该方法在后面的查找连接中会涉及到,我们在这里先对其进行讲述。

总结 :这是一个请求关闭一个连接的过程。

从上面可以看到,在执行第一个默认拦截器的逻辑的时候,调用transmitter.prepareToConnect()方法。我们接下去看一下这个方法做了上面准备工作。

总结:其实这个方法,重点就是为连接作准备。但是主要目的还是找到可以复用的连接。它的逻辑如下:

总结: 这个方法是代表Transmitter获得了一个可用的连接了。那么它做的工作是将这个连接保存起来。然后将自己登记到RealConnection。这个方法后面会有用到,这里先讲解一下。

有了章节二的预备知识后,我们可以来看ConnectIntercepter了。不过他只是触发打开连接的按钮,真正连接的查找和连接逻辑在exchangeFinder.java和Exchage.java。不管怎么样,我们先来看一下开始的地方。

调用transmitter的newExcahge()方法,得到一个可以与远程地址进行通行的Exchage,然后就丢给下一个拦截器了。顺带说一下,在第一篇《》我们知道,紧跟着ConnectIntercepter的下一个拦截器是ServerIntercepter,那我们可以很容易的推理出,它拿到了ConnectIntercepter的excahge后,就进行了数据传输和数据接收。

调用exchangeFinder.find()找到一个连接,返回ExchangeCodec。ExchangeCodec是一个接口,它代表着Http请求的加密,和响应的解密。它有2个具体实现:Http1ExchangeCodec和Http2ExchangeCodec,它的详细内容详见【4】。我们继续看连接的查找。

总结: 该方法顾名思义,就是通过一个while(true)不断的找一个连接候选人,然后检查是否健康可用的,如果不能用就进行标记,丢弃。详细的如下:

接下来就是重中之重了,让我们来一起品味这很香的查找逻辑。

总结:这是一个查找连接的过程,在查找的时候,综合考虑了自身的连接,路由的结果,连接池的复用,和新建几种方案。具体的如下:

总结: 根据连接性质不一样,生成不同的数据加解密器。

章节小结:本节从ConnectIntercepter开始,追寻了一个连接如何被获得的过程,它涉及到了新建连接、路由选择,连接池复用等逻辑,最终的产物是Exchange,由它去到下一个拦截器:ServerIntercepter进行网络传输工作。其中Exchange、RealConnectionPool起到了很重要角色,我们将在下一小节中解析

RealConnection,描述的是一次与远程服务器的连接,所以它需要具备与远程地址进行建立连接,通行的能力。这些能里我们可以在后续它的成员变量和方法中看出来。照例,我们来看一下的构造函数和成员变量。

总结: 一些主要的成员变量已经如上列出注释。接下来从它最重要的方法connect()入手来理解它的作用。

总结:该方法是Connection处理连接逻辑的地方,主要包括一下几点:

总结: 创建隧道连接,就是在Http代理的代理上建立Https连接。主要的做了如下事情:

总结: 该方法是与远程服务器地址建立起Socket连接,并获得输入输出流。具体的如下:

在这一步connect过后,socket完成了3次握手建立TCP连接。

总结: 该方法根据请求协议,来确定建立的连接是否需要进一步协议处理。具体的如下:

总结: 在这个方法里,连接将进行SSL配置,三次握手,证书校验等工作。具体的如下:

总结: 可以看到,对SSLScoket配置,就是遍历connectionSpecs集合,然后挑出适合于这个sslScoket的配置,然后进行要用。具体的如下:

总结: 对这个socket设置tls版本和密码套件

总结: 这个方法在后续的解析中会涉及到,所以先放在这里讲了。主要是用来判断这个连接可不可以复用的。判断条件如注释。

在3.6的findConnetion过程中,我们看到了很多次连接池的身影,它对连接的复用也起着绝对重要的位置,如果不仔细的理解它的话,查找连接这块的逻辑就会少一大快。照例,从它的出生、成员变量和构造函数来初步认识它。

在Builder()里创建默认的连接池。

总结: 可以看出到,ConectionPool才用代理模式,实际逻辑交给RealConnection()。5个最大空闲连接,每个连接可保活5分钟。

总结: 可以看出,这个连接池是用来管理同个地址的连接的。它提供根据地址查找可用连接、清除连接等功能。接下来介绍一下它的几个重要方法。

总结: 遍历保存的连接,调用RealConnection.isEligible() 来判断这个连接是否符合条件。将这个请求的Transmitter登记到RealConnection。

总结: 将一个连接变为空闲连接。如果此时这个连接不可用的话,将连接从连接集合中移除,并返回true。如果还可以,通知清理任务执行,并返回false。

总结: 该方法是将一个连接放入连接池中,然后执行清理任务,不过它会被堵塞住,直到【5.4】方法触发。

总结: 这是一个清理连接的方法,它做的使其如下:

小篇结:本篇是介绍OkHttp的网络连接建立。开篇先介绍了Trasnmitter这一重要的类,随后从ConnectIntercepter入手,深入研究了连接Connection的获取逻辑。在获取的过程中,我们将到了连接缓存的处理。当获取不到缓存的时候,便会新建一个全新的网络连接,在这个过程中会进行Http的3次握手等过程。在最后2小节中,分别介绍了在整个过程中的中心类,被查找对象:RealConnection。和管理缓存Connection的ConnectionPool。最后以一张图来总结这一过程

深入理解OkHttp源码——网络操作

这篇博客侧重于了解OkHttp的网络部分,包括Socket的创建、连接,连接池等要点。OkHttp对Socket的流操作使用了Okio进行了封装,本篇博客不做介绍,想了解的朋友可以参考拆轮子系列:拆Okio

OkHttp中关于网络的几个概念

下面的主要翻译自OkHttp的官方文档,查看原文.

URL

URLs(比如https://github.com/square/okhttp)是HTTP和网络的基础,不止指定了Web上的资源,还指定了如何获取该资源。

Address

Address(比如github.com)指定了一个webserver和所有连接到该服务器的必需的静态配置:端口、HTTPS设置和首选网络协议(HTTP/2或SPDY)。
URLs属于同一个address的可以共享同一个底层的Socket连接。共享一个连接具有显著的性能优势:低延迟、高吞吐量(由于TCP慢启动)和省电。OkHttp使用连接池自动再利用HTTP/1.x的连接,复用HTTP/2和SPDY的连接。
在OkHttp中,address的一些字段来自URL(模式、主机名、端口),剩下的部分来自OkHttpClient。

Routes

Routes提供真正连接到一个网络服务器所需的动态信息。这指定了尝试的IP地址(或者进过DNS查询得到的地址)、使用的代理服务器(如果使用了ProxySelector)和使用哪个版本的TLS进行谈判。(对于HTTPS连接)
对于一个地址,可能有多个路由。举个例子,一个网路服务器托管在多个数据中心,那么在DNS中可能会产生多个IP地址。

Connections

当请求一个URL时,OkHttp会做以下几件事情:
1. 使用URL和配置好的OkHttpClient创建一个address。这个地址指明了我们将如何连接网络服务器。
2. 尝试从连接池中得到该地址的一条连接
3. 如果在连接池中没有找到一条连接,那么选择一个route进行尝试。通常这意味着做一个DNS请求得到服务器IP的地址,必要时会选择一个TLS版本和一个代理服务器。
4. 如果是一条新的路由,那么建立一条直接的socket连接或TLS通道(HTTPS使用HTTP代理)或一个直接的TLS连接。
5. 发送HTTP请求,读取响应。
如果连接出现了问题,OkHttp会选择另外一条路由进行再次尝试。这使得OkHttp在一个服务器的一些地址不可到达时仍然可用。
一旦读取到响应后,连接将会退还到连接池中以便可以复用。连接在池中闲置一段时间后将会被释放。

结合源码进行分析

Address的创建

Address的创建在RetryAndFollowupInterceptor中的createAddress方法中,代码如下:

private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    //如果是HTTPS协议
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    //可以看到Address的构造方法中的一部分参数由URL提供,一部分由OkHttpClient提供
    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }

 

从代码中可以看出,Address的信息一部分由URL提供,主要包括主机名和端口;另一部分由OkHttpClient提供,如dns、socketFactory等等。
根据HttpUrl是否是HTTPS,创建sslSocketFactory等字段,而在Address的构造方法中,则根据sslSocketFactory是否为null判断是HTTP模式还是HTTPS模式。

StreamAllocation的创建

StreamAllocation类负责管理连接、流和请求三者之间的关系。其创建在RetryAndFollowupInterceptor的intercept方法中,使用OkHttpClient的连接池以及上面创建的Address进行初始化,代码如下:

streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()))

其中client的连接池是在OkHttpClient.Builder中设置的,而其设置在Builder的构造方法中,调用的是ConnectionPool的默认构造方法,代码如下:

 public Builder() {
      ...
      //默认连接池
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
    }
  • 1
  • 2
  • 3

下面是ConnectionPool的构造方法:

  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

从上面可以看到,默认的连接池的最大空闲连接数为5,最长存活时间为5min。

HttpStream和Connection的创建

深入理解OkHttp源码(二)——获取响应中,我们知道了HttpStream以及Connection的创建都是在ConnectInterceptor拦截器中,代码如下:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpStream, connection);
  }

 

从上面的代码可以看到,首先调用StreamAllocation的newStream方法就可以得到HttpStream对象,同时也就得到了Connection对象。下面首选从StreamAllocation的newStream()方法看起:

  public HttpStream newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    //得到连接时长、读超时以及写超时参数
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //得到一个健康的连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

      HttpStream resultStream;
      //如果协议是HTTP 2.x协议
      if (resultConnection.framedConnection != null) {
        resultStream = new Http2xStream(client, this, resultConnection.framedConnection);
      }
      //协议是HTTP 1.x,设置连接底层的Socket属性
      else {
        resultConnection.socket().setSoTimeout(readTimeout);
        resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
        resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
        resultStream = new Http1xStream(
            client, this, resultConnection.source, resultConnection.sink);
      }

      synchronized (connectionPool) {
        stream = resultStream;
        return resultStream;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

从上面的代码可以看出,首先从OkHttpClient中获取连接超时、读取超时、写超时和是否连接失败重试参数,然后试图找到一条健康的连接,接下来是根据连接的framedConnection字段是否为null,得到Http2xStream或Http1xStram,前者是HTTP/2的实现,后者是HTTP/1.x的实现。
可以看到主要的逻辑肯定都在findHealthyConnection方法中,下面是findHeadlthyConnection方法的实现:

 /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
      //死循环
    while (true) {
      //得到一个候选的连接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // 如果是一个全新的连接,跳过额外的健康检查
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      //如果候选连接通不过额外的健康检查,那么继续寻找一个新的候选连接
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

从注释中可以看到,该方法用于查找一条健康的连接并返回,如果连接不健康,那么会重复查找,直到查找到健康的连接。可以看到方法内是一个死循环,首先调用findConnection方法得到候选的连接,如果该连接是一个全新的连接,那么就直接返回不需要验证是否健康,如果不是则需要验证是否健康,如果不健康调用noNewStreams()方法后继续下一次循环,否则返回。对于候选连接,总结一下就是下面几种情况:
1. 候选连接是一个全新的连接,那么直接返回;
2. 候选连接不是一个全新的连接,但是是健康的,那么直接返回;
3. 候选连接不是一个全新的连接,并且不健康,那么继续下一轮循环
经过上面的分析,我们查看findConnection()方法:

/**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    //对连接池加锁,因为可能会有别的线程加入连接或移除连接
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (stream != null) throw new IllegalStateException("stream != null");
      if (canceled) throw new IOException("Canceled");

      //首先尝试使用本实例的连接
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      //其次,尝试从连接池中得到连接
      RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
      if (pooledConnection != null) {
        this.connection = pooledConnection;
        return pooledConnection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
      synchronized (connectionPool) {
        route = selectedRoute;
        refusedStreamCount = 0;
      }
    }
    //根据路由创建新的连接
    RealConnection newConnection = new RealConnection(selectedRoute);
    acquire(newConnection);

    //将得到的新连接加入连接池中并设置本实例的连接
    synchronized (connectionPool) {
      Internal.instance.put(connectionPool, newConnection);
      this.connection = newConnection;
      if (canceled) throw new IOException("Canceled");
    }

    //底层Socket连接
    newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
        connectionRetryEnabled);
    routeDatabase().connected(newConnection.route());

    return newConnection;
  }

 

从注释中可以看出,该方法返回一个拥有新流的连接。首先检查已存在的连接,其次连接池,最后建立一个新的连接。
从代码中可以看出,首先对连接池加锁,这儿的连接池是在创建StreamAllocation中传入的,而那个参数是在创建OkHttpClient时就创建的,我们一般使用OkHttpClient时,都会将其做成单例,那么连接池就是唯一的,由于可能存在别的线程从连接池中执行插入以及连接池自身连接的清除工作,所以需要对其进行加锁。首先获取本对象的connection,如果不为null并且noNewStreams为false,那么直接使用本连接;如果不能使用本连接,那么尝试从连接池中获取连接,如果可以得到,那么直接返回,否则将进行下一步创建新连接;首先根据路由创建一个新的连接,然后调用acquire方法使连接持有该StreamAllocation对象,接下来将新的连接添加就连接池,最后调用connect方法进行连接。
这里面有一个Internal.instance的实例,Internal是一个抽象类,其具体实现instance初始化是在OkHttpClient的静态初始化块中,如下:

static {
    Internal.instance = new Internal() {
      @Override public void addLenient(Headers.Builder builder, String line) {
        builder.addLenient(line);
      }

      @Override public void addLenient(Headers.Builder builder, String name, String value) {
        builder.addLenient(name, value);
      }

      @Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
        builder.setInternalCache(internalCache);
      }

      @Override public boolean connectionBecameIdle(
          ConnectionPool pool, RealConnection connection) {
        return pool.connectionBecameIdle(connection);
      }

      @Override public RealConnection get(
          ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
        return pool.get(address, streamAllocation);
      }

      @Override public void put(ConnectionPool pool, RealConnection connection) {
        pool.put(connection);
      }

      @Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
        return connectionPool.routeDatabase;
      }

      @Override public StreamAllocation callEngineGetStreamAllocation(Call call) {
        return ((RealCall) call).streamAllocation();
      }

      @Override
      public void apply(ConnectionSpec tlsConfiguration, SSLSocket sslSocket, boolean isFallback) {
        tlsConfiguration.apply(sslSocket, isFallback);
      }

      @Override public HttpUrl getHttpUrlChecked(String url)
          throws MalformedURLException, UnknownHostException {
        return HttpUrl.getChecked(url);
      }

      @Override public void setCallWebSocket(Call call) {
        ((RealCall) call).setForWebSocket();
      }
    };
  

首先看put方法,因为一开始时连接池中肯定是没有连接的,Internal.instance的put方法调用了连接池的put方法,下面是ConnectionPool的put方法:

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //如果清理线程没有开启,则开启
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

从代码中可以看出,当第一个连接被添加就线程池时,开启清除线程,主要清除那些连接池中过期的连接,然后将连接添加就connections对象中。下面看一下cleanupRunnable和connections的定义,其中connections是一个阻塞队列。

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        //得到下一次清除的等待时长
        long waitNanos = cleanup(System.nanoTime());
        //没有连接了,清除任务终结
        if (waitNanos == -1) return;
        //需要等待一定时长
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();

 

可以看到cleadupRunnbale是一个死循环,调用cleanup方法进行清理工作并返回一个等待时长,如果有等待时长,那么让连接池进行休眠。其中清理工作在cleanup方法中,代码如下:

/**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
   * -1 if no further cleanups are required.
   */
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      //检查每个连接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //如果该连接正在运行,则跳过
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        //查找出空闲时间最长的连接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      //如果时间超出规定的空闲时间或者数量达到最大空闲树,那么移除。关闭操作在后面
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      }
      //如果时间和数量都没到达上限,那么得到存活时间
      else if (idleConnectionCount > 0) {
        return keepAliveDurationNs - longestIdleDurationNs;
      }
      //如果所有连接都在使用中,返回最大存活时间
      else if (inUseConnectionCount > 0) {
        return keepAliveDurationNs;
      }
      //没有连接,关闭清除线程
      else {
        cleanupRunning = false;
        return -1;
      }
    }

    //关闭连接底层的Socket
    closeQuietly(longestIdleConnection.socket());

    // 再次执行清除
    return 0;
  }

从代码中可以看出,对当前连接池中保存的所有连接进行遍历,然后调用pruneAndGetAllocationCount()方法获取连接上可用的StreamAllocation的数量以及删除不可用的StreamAllocation,如果数量大于0,则表示该连接还在使用,那么继续下一次遍历;否则空闲连接数+1,需要查找出所有不可用的连接中最大的空闲时间。遍历做完后,根据不同情况不同的值返回不同的结果,一旦找到了最大的空闲连接,那么在同步块外部调用closeQuietly关闭连接。
pruneAndGetAllocationCount()方法用于删除连接上不可用的StreamAllocation以及可用的StreamAllocation的数量,下面是其具体实现:

/**
   * Prunes any leaked allocations and then returns the number of remaining live allocations on
   * {@code connection}. Allocations are leaked if the connection is tracking them but the
   * application code has abandoned them. Leak detection is imprecise and relies on garbage
   * collection.
   */
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
  //得到关联在连接上StramAllocation对象列表
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);


      //可用
      if (reference.get() != null) {
        i++;
        continue;
      }

      // We‘ve discovered a leaked allocation. This is an application bug.
      Platform.get().log(WARN, "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?", null);
      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

 

需要注意的是for循环,i的控制在循环内部,如果StreamAllocation为null,那么直接删除,如果连接没有一个可用的StreamAllocation,那么设置连接的idleAtNanos为now-keepAliveDurationNs,即5分钟之前。
至此,我们分析完了当创建了一个新连接,是如何被添加到线程池中的以及线程池的自动清除线程是如何工作的。下面看连接是如何建立连接的,在findConnection方法中,当创建了一个新的Connection后,调用了其connect方法,connect负责将客户端Socket连接到服务端Socket,代码如下:

 public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    //不是HTTPS协议
    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }

    while (protocol == null) {
      try {
        if (route.requiresTunnel()) {
          buildTunneledConnection(connectTimeout, readTimeout, writeTimeout,
              connectionSpecSelector);
        } else {
          buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
        }
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }
  }

主要看while循环处,如果HTTPS通道使用HTTP代理,那么调用buildTunneledConnection方法,否则调用buildConnection方法,如果出现异常,那么就在catch中做了一些清理工作,然后会继续进入循环,因为将protocol置为了null。一般的请求都是直接调用buildConnection方法的,下面我们看buildConnection方法:

 /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
  private void buildConnection(int connectTimeout, int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    connectSocket(connectTimeout, readTimeout);
    establishProtocol(readTimeout, writeTimeout, connectionSpecSelector);
  }

 

该方法做在raw socket上连接HTTP或HTTPS连接的准备工作,方法内部又是调用了另外两个方法,下面分别介绍。
connectSocket为创建Socket以及连接Socket,代码如下:

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    //创建Socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      //连接Socket
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      throw new ConnectException("Failed to connect to " + route.socketAddress());
    }
    //使用Okio封装Socket的输入输出流
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  }

从代码可以看出,首先获取代理和地址,然后根据代理的类型是使用SocketFactory工厂创建无参的rawSocket还是使用带代理参数的Socket构造方法,得到了rawSocket对象后,设置读超时,然后调用connectSocket进行Socket的连接,服务器的信息在route的socketAddress中,最后,得到rawSocket的输入流和输出流,这里使用了Okio进行了封装,就不做过多设计了。
其中Plateform.get()方法返回不同平台的信息,因为OkHttp是可以用于AndroidJava平台的,而Java又有多个版本,所以进行了平台判断。get()是一个单例,其初始化在findPlatform方法中,如下:

private static Platform findPlatform() {
    Platform android = AndroidPlatform.buildIfSupported();

    if (android != null) {
      return android;
    }

    Platform jdk9 = Jdk9Platform.buildIfSupported();

    if (jdk9 != null) {
      return jdk9;
    }

    Platform jdkWithJettyBoot = JdkWithJettyBootPlatform.buildIfSupported();

    if (jdkWithJettyBoot != null) {
      return jdkWithJettyBoot;
    }

    // Probably an Oracle JDK like OpenJDK.
    return new Platform();
  }

可以看到findPlatform分为了android平台、jdk9、有JettyBoot的jdk还有默认的平台几类。这边看默认的Platform就可以了。下面看其socket方法:

public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }

可以看到就是调用socket的connect方法,至此,本地Socket与后台Socket建立了连接,并得到了输入输出流。
buildConnection方法中还有一个establishProtocol方法,该方法用于建立协议,设置protocol的值,这样上面的循环就可以跳出了。代码如下:

private void establishProtocol(int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    //如果是HTTPS协议
    if (route.address().sslSocketFactory() != null) {
      connectTls(readTimeout, writeTimeout, connectionSpecSelector);
    }
    //默认HTTP 1.1协议
    else {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
    }

    if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // Framed connection timeouts are set per-stream.

      FramedConnection framedConnection = new FramedConnection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .protocol(protocol)
          .listener(this)
          .build();
      framedConnection.start();

      // Only assign the framed connection once the preface has been sent successfully.
      this.allocationLimit = framedConnection.maxConcurrentStreams();
      this.framedConnection = framedConnection;
    } else {
      this.allocationLimit = 1;
    }
  }

可以看到该方法主要就是给protocol赋值,另外对于SPDY或HTTP/2协议有别的处理,这儿就不多介绍了。(==因为我自己目前也不懂,不过分析到这儿就已经足够了==)。
至此,我们分析完了是如何新建一个连接,然后将其放入连接池以及真正地与后台建立连接的,这一切都是发生在ConnectInterceptor中,所以也就可以理解为什么这个拦截器要命名为连接拦截器了。
上面的代码主要分析了新建连接,从上面的分析我们知道,还可以直接使用StreamAllocation的连接或从连接池中获取连接。我们知道当提交请求后,每个请求被封装成RealCall对象,而每个RealCall对象都只能被执行一次,RealCall对象持有RetryAndFollowupInterceptor,Connection又是RetryAndFollowupInterceptor持有的,那么如果发生重定向时,但是主机名相同,只是路径不同时,那么将会是重用之前创建的Connection;而如果是两个相同主机的不同请求,那么在第一个连接被创建放进线程池后,第二个请求的连接就可以从连接池中得到了。
findConnection方法中通过调用Internal.instance的get方法从连接池中获取连接,而其get方法又是通过调用连接池的get方法,具体代码如下:

/** Returns a recycled connection to {@code address}, or null if no such connection exists. */
  RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.allocations.size() < connection.allocationLimit
          && address.equals(connection.route().address)
          && !connection.noNewStreams) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

 

从上面代码中可以看出,get方法对连接池队列遍历,如果连接的StreamAllocation小于allocationLimit参数并且地址相等且连接的noNewStreams为false,那么将streamAllocation赋给连接。其中allocationLimit在协议为HTTP/1.x时为1,这也就意味着同一个Connection只能与一个StreamAllocation绑定,这就解释了为什么官方文档文档说连接池重用HTTP/1.x连接,复用HTTP/2或SPDY连接。

发送请求和获取响应

经过ConnectInterceptor后,为请求创建了Connection对象以及HttpStream对象,下面进入到CallServerInterceptor中发送请求和获取响应,首先看CallServerInterceptor的intercept方法:

@Override public Response intercept(Chain chain) throws IOException {

    HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //发送HTTP首部信息
    httpStream.writeRequestHeaders(request);

    //如果HTTP方法允许有请求主体并且请求不为null,发送HTTP请求主体信息
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //Okio进行封装发送数据
      Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
    }

    httpStream.finishRequest();

    //读响应首部构建Response对象
    Response response = httpStream.readResponseHeaders()
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    if (!forWebSocket || response.code() != 101) {
      response = response.newBuilder()
          .body(httpStream.openResponseBody(response))
          .build();
    }

    //服务端不支持HTTP持久连接,那么需要关闭该连接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    int code = response.code();
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

 

可以看到写请求和读响应都是通过HttpStream对象,在前面的分析中知道了HttpStream的具体实现是Http1xStream或Http2xStream。我们主要看Http1xStream的各个实现,首先看写头部信息的writeRequestHeaders方法,下面是Http1xStream的具体实现:

@Override public void writeRequestHeaders(Request request) throws IOException {
    //得到请求行
    String requestLine = RequestLine.get(
        request, streamAllocation.connection().route().proxy().type());
    writeRequest(request.headers(), requestLine);
  }

 

该方法用户将头信息发送给服务端,首先获取HTTP请求行(类似于“GET / HTTP/1.1”),然后调用writeRequest方法进行具体的写操作,下面是writeRequest的实现:

public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

 

从代码中可以看出,首先判断状态,状态初始值为STATE_IDLE,表明如果在写头部信息之前做了别的操作,那么将会报错,也就意味着必须首先进行写头部信息的操作;然后写入请求行以及换行符,接下来就是对头部信息做遍历,逐个写入,最后将状态置为STATE_OPEN_REQUEST_BODY。
在写完头部信息之后,如果需要写请求的主体部分,还会进行写主体部分操作,当请求发送完成后,调用finishRequest方法就行刷新输出流。

 @Override public void finishRequest() throws IOException {
    sink.flush();
  }

 

发送完请求之后,首先调用readResponseHeaders()获取响应的头部信息,然后构造Response对象,readResponseHeaders代码如下:

@Override public Response.Builder readResponseHeaders() throws IOException {
    return readResponse();
  }

  ** Parses bytes of a response header from an HTTP transport. */
  public Response.Builder readResponse() throws IOException {
    if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
      throw new IllegalStateException("state: " + state);
    }

    try {
      while (true) {
        StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());

        Response.Builder responseBuilder = new Response.Builder()
            .protocol(statusLine.protocol)
            .code(statusLine.code)
            .message(statusLine.message)
            .headers(readHeaders());

        if (statusLine.code != HTTP_CONTINUE) {
          state = STATE_OPEN_RESPONSE_BODY;
          return responseBuilder;
        }
      }
    } catch (EOFException e) {
      // Provide more context if the server ends the stream before sending a response.
      IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
      exception.initCause(e);
      throw exception;
    }
  }

 

可以看到readResponseHeaders方法又调用了readResponse方法,而readResponse方法中首先对状态进行判断,然后进入一个死循环。首先获取响应的状态行(比如“H T T P / 1 . 1 2 0 0 T e m p o r a r y R e d i r e c t”)得到协议类型、状态码和消息,然后再调用readHeaders()方法读取头部信息,最后比较状态码不是100,那么说明请求发送完整了,那么将状态置为STATE_OPEN_RESPONSE_BODY,然后返回响应,这时的响应中只有协议类型、状态码、消息和头部信息。下面看一下readHeaders()方法是如何获取头部信息的:

/** Reads headers or trailers. */
  public Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
      Internal.instance.addLenient(headers, line);
    }
    return headers.build();
  }
  • 1

可以看到每行遍历直到第一个空行,然后调用Internal.instance的addLenient方法将这一行的信息解析并添加到头部中,下面是addLenient方法的实现:

 @Override public void addLenient(Headers.Builder builder, String line) {
        builder.addLenient(line);
      }

 

可以看到只是简单的调用Builder的addLenient方法,那么继续看Builder的addLenient方法:

 Builder addLenient(String line) {
      int index = line.indexOf(":", 1);
      if (index != -1) {
        return addLenient(line.substring(0, index), line.substring(index + 1));
      } else if (line.startsWith(":")) {
        // Work around empty header names and header names that start with a
        // colon (created by old broken SPDY versions of the response cache).
        return addLenient("", line.substring(1)); // Empty header name.
      } else {
        return addLenient("", line); // No header name.
      }
    }
  • 1

从上面的代码可以看到,首先获取“:”的位置,如果存在“:”,那么调用addLenient将名和值添加进列表中,如果以”:”开宇,则头信息的名称为空,有值;如果都没有,那么没有头部信息名。三种情况都是调用addLenient方法,如下:

 /**
     * Add a field with the specified value without any validation. Only appropriate for headers
     * from the remote peer or cache.
     */
    Builder addLenient(String name, String value) {
      namesAndValues.add(name);
      namesAndValues.add(value.trim());
      return this;
    }

其中,nameAndValues是一个字符串的列表。
到上面为此,读取响应的头部信息已经完成,接下来在CallServerInterceptor中做的是调用openResponseBody方法读取响应的主体部分,方法如下:

@Override public ResponseBody openResponseBody(Response response) throws IOException {
    Source source = getTransferStream(response);
    return new RealResponseBody(response.headers(), Okio.buffer(source));
  }

 

从代码中可以看出,首先调用getTransferStream方法就行流转换,因为传入的Response中有头部信息,而头部信息中可能会有编码的信息,所以需要就行转换,然后再创建RealResponseBody对象返回。先看getTransferStream()方法的实现:

private Source getTransferStream(Response response) throws IOException {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    // Wrap the input stream from the connection (rather than just returning
    // "socketIn" directly here), so that we can control its use after the
    // reference escapes.
    return newUnknownLengthSource();
  }

从代码中可以看到一共可能有四种返回值,分别是以下四种情况:
1. 如果响应主体部分不应有内容,那么返回newFixedLengthSource(0)
2. 如果响应头部中Transfer-Encoding为chunked,即分块了,那么返回newChunkedSource
3. 如果响应中有个具体长度,那么返回newFixedLengthSource,并且指定长度
4. 以上情况均不满足,返回newUnknownLengthSource

总结

至此,OkHttp的网络部分讲解结束。OkHttp中涉及到了几个重要的类,StreamAllocation负责根据请求创建连接,可能是新建一个连接,可能是重用自己内部的连接,也有可能是从连接池中获取连接;而连接的建立就涉及到了Socket的创建以及连接;当连接创建好后,就创建了HttpStream对象,负责操作底层Socket的输出输入流。
在整个OkHttp的工作流程中,在RetryAndFollowupInterceptor中创建StreamAllocation,在ConnectInterceptor中创建连接以及HttpStream对象,在CallServerInterceptor中操作HttpStream进行发送请求和读取响应。

以上是关于三、深入理解OkHttp:连接处理-ConnectIntercepter的主要内容,如果未能解决你的问题,请参考以下文章

TCP建立连接过程(深入源码理解3次握手)

深入理解函数式编程一种基于Consumer和Builder的OkHttp函数式编程方法

深入理解TCP协议及其源代码

深入理解TCP协议及其源代码

深入理解OkHttp源码——提交请求

OkHttp源码解析 (三)——代理和路由