面试官:“Okhttp连接池是咋实现“?你:该咋回答呢?

Posted 冬天的毛毛雨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面试官:“Okhttp连接池是咋实现“?你:该咋回答呢?相关的知识,希望对你有一定的参考价值。

背景

最近把Okhttp的源码又整理了下,之前也写过Okhttp源码的文章,我觉得那会对Okhttp的认识不够深入,所以这次还是像炒咸饭一样吗?no-no-no,这次我会整理点精华部分,让大家学习点东西,如标题所示,这次要讨论的话题是Okhttp的连接池怎么工作的,以及它工作的原理,为什么要整理这篇文章呢,因为okhttp的连接池在面试过程中很大可能被问到,因此在这里总结出来,供大家参考。

为了大家更好的理解发起同步和异步的过程,画了张草图给大家,如果有不正确的地方望指出:

  • 我们知道Okhttp中通过okhttpClient对象是通过Builder对象初始化出来的,此处Builder的用法是建造者模式,建造者模式主要是分离出外部类的属性初始化,而初始化属性交给了内部类Buidler类,这么做的好处是外部类不用关心属性的初始化。 而在初始化的时候有interceptorsnetworkInterceptors两种拦截器的初始化,还有dispatcher(分发器)的初始化,以及后面需要讲到的cache(缓存)初始化等。
  • 初始化完了后通过builder的build方法构造出okhttpClient对象,该类被称作客户端类,通过它的newCall方法返回RealCall对象,在newCall过程的过程中需要request的信息,request信息包装了url、method、headers、body等信息。最后通过RealCall的同步或异步方法交给了okhttpClientdispatcher来处理,在处理同步或异步之前都会判断有没有正在executed,所以我们不能对同一个RealCall调用异步或同步方法。
  • 在异步的时候会把RealCall给包装成一个AsyncCall,它是一个runnable对象。接着就来到了分发器异步处理部分,首先会把AsyncCall加入到readyAsyncCalls的集合中,该集合表示准备阶段的请求集合,紧接着从runningAsyncCalls(该集合装的都是要即将请求的集合)readyAsyncCalls集合中找相同host的AsyncCall,如果找到了会把当中记录的相同host的个数给该AsyncCall注意这里保存host个数用的原子性的AtomicInteger来记录的
  • 接着会去判断最大的请求是否大于64以及相同host是否大于5个,这里也是okhttp面试高频知识点,如果都通过的话,会把当前的AsyncCall的相同host记录数加一,接着会加入到runningAsyncCalls集合中,接着循环遍历刚符合条件的AsyncCall,通过线程池去执行AsyncCall,注意此处的线程池的配置是没有核心线程,总的线程个数是没有限制的,也就是说都是非核心线程,并且个数没有限制,非核心线程等待的时间是60秒,并且使用的任务队列是SynchronousQueue,它是一个没有容量的阻塞队列,只会当里面没有任务的时候,才能往里面放任务,当放完之后,只能等它的任务被取走才能放,这不就是jdk里面提供的Executors.newCachedThreadPool线程池吗,可能是okhttp想自己定义线程工厂的参数吧,定义线程的名字。
  • 所以到这里才会进入到子线程,由于AsyncCall是一个runnable,因此最终执行来到了它的run方法吧,run方法最终会走到execute方法,该方法来到了okhttp最有意思的单链表结构的拦截器部分,它会把所有的拦截器组装成一个集合,然后传给RealInterceptorChainprocess方法,在该方法中,会先把下一个RealInterceptorChain初始化出来,然后把下一个RealInterceptorChain传给当前Interceptor的intercept方法,最终一个个的response返回到AsyncCallexecute方法。
  • 处理完当前的AsyncCall后,会交给dispatcher,它会将该AsyncCall的host数减一,并且把它从runningAsyncCalls集合中移除,接着再从readyAsyncCalls集合中拿剩下的AsyncCall继续执行,直到执行完readyAsyncCalls里面的AsyncCall

关于okhttp发起异步和同步请求可以看这里OkHttp 源码解析

或者看我之前分析的okhttpOkHttp大流程分析

这就是整个okhttp的执行流程,而最重要的是拦截器部分,我会在这章介绍拦截器的连接池部分,先通过源码的形式介绍它的来龙去脉:

连接池的意义

  • 频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)是非常消耗网络资源和浪费时间的,HTTP中的keepalive连接对于 降低延迟和提升速度有非常重要的作用。
  • 复用连接就需要对连接进行管理,这里就引入了连接池的概念。
  • Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),连接池有ConectionPool实现,对连接进行回收和管理。

概要

连接池部分主要是在RealConnectionPool类中,该类用connections(双端队列)存储所有的连接,cleanupRunnable是专门用来清除超时的RealConnection,既然有清除的任务,那肯定有清除的线程池,没错,该线程池(executor)跟okhttp处理异步时候的线程池是一样的,keepAliveDurationNs表示每一个连接keep-alive的时间,默认是5分钟,maxIdleConnections连接池的最大容量,默认是5个。RealConnection中有transmitters字段,用来保存该连接的transmitter个数,通过里面的transmitter个数来标记该RealConnection有没有在使用中。

注:如果大家只是想关心面试过程中怎么针对面试官问连接池的问题,可以直接看文章结尾哟

源码分析

1.RealCall#getResponseWithInterceptorChain

拦截器的组装是在RealCall的getResponseWithInterceptorChain方法中放到集合里面了:

Response getResponseWithInterceptorChain() throws IOException {
    //所有拦截器的组装集合
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接池的使用在这里面
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    try {
      Response response = chain.proceed(originalRequest);
      return response;
    } catch (IOException e) {

    } finally {

    }
  }
}

2.ConnectInterceptor#intercept

连接池在ConnectInterceptor中包装起来了,我们进去瞄一眼:

@Override
public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  //拿到chain的Transmitter,里面包装了RealCall、okhttpClient、connectionPool等信息
  Transmitter transmitter = realChain.transmitter();
  //是否不是get请求
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

  return realChain.proceed(request, transmitter, exchange);
}

3.Transmitter#newExchange

拿到chain的Transmitter对象,该对象是RealCallokhttpClientconnectionPool等信息的包装类,将是否不是get请求的标识传给了transmitternewExchange方法:

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  //exchangeFinder是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的
  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
  synchronized (connectionPool) {
    this.exchange = result;
    return result;
  }
}

4.ExchangeCodec#find

该方法中将ExchangeCodec的获取交给了exchangeFinder对象,ExchangeCodec是一个接口,实现类有Http2ExchangeCodecHttp1ExchangeCodec,这两个类表示http1和http2的建立连接的类,里面实现了writeRequestHeaderscreateRequestBody等方法,这两个方法是在CallServerInterceptor拦截器中使用的。exchangeFinder是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的。我们接着看exchangeFinder的find方法:

public ExchangeCodec find(
    OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  int connectTimeout = chain.connectTimeoutMillis();
  int readTimeout = chain.readTimeoutMillis();
  int writeTimeout = chain.writeTimeoutMillis();
  int pingIntervalMillis = client.pingIntervalMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
    return resultConnection.newCodec(client, chain);
  } catch (RouteException e) {
  } catch (IOException e) {

  }
}

5.ExchangeCodec#findHealthyConnection

这个方法没什么好说的,就一句,看findHealthyConnection方法:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
    boolean doExtensiveHealthChecks) throws IOException {
  //开启循环找符合条件的RealConnection
  while (true) {
    //找当前keep-alive有效时间内的连接
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        pingIntervalMillis, connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      //如果该连接没有出现过连接失败才会返回,连接失败的标识是在Exchange中处理的,而Exchange中的处理是交给了CallServerInterceptor的
      if (candidate.successCount == 0) {
        return candidate;
      }
    }

    //如果socket连接断开了或者失败了也认为不是有效的连接
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      candidate.noNewExchanges();
      continue;
    }

    return candidate;
  }
}

6.ExchangeCodec#findConnection

该方法会开启循环来找符合keep-alive有效时间内的连接,紧接着判断它是否在CallServerInterceptor拦截器处理过程中出现连接失败或者是该连接的socket连接断开了也认为不是有效的连接。我们主要看findConnection怎么处理keep-alive的有效时间的连接:

  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    //定义最终找到的那个RealConnection
    RealConnection result = null;
    Route selectedRoute = null;
    //用来标识有没有分配的连接
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.
      //获取当前分配的连接的路由信息
      Route previousRoute = retryCurrentRoute()
          ? transmitter.connection.route()
          : null;

      //获取有没有分配过的连接
      releasedConnection = transmitter.connection;
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      //看当前正在分配的连接有没有
      if (transmitter.connection != null) {
        result = transmitter.connection;
        releasedConnection = null;
      }

      //如果上面都没找到从连接池中找连接
      if (result == null) {
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else {
          selectedRoute = previousRoute;
        }
      }
    }
    closeQuietly(toClose);

    //有的话直接返回
    if (result != null) {
      return result;
    }

    //检查有没有新的路由
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      //如果有新的路由信息
      if (newRouteSelection) {
        routes = routeSelection.getAll();
        //再次从连接池中找
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }

      // 没有从连接池中找到连接
      if (!foundPooledConnection) {
        //创建连接
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // 从连接池中找到返回连接
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 准别TCP连接
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      //继续从连接池中找一次看有没有符合条件的连接池
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;
      } else {
        //如果不符合条件则把当前连接放到连接池里面
        connectionPool.put(result);
      }
    }
    closeQuietly(socket);
    return result;
}

首先会先判断有没有被分配的连接,如果没有,从连接池中找符合条件的连接,通过connectionPool.transmitterAcquirePooledConnection去找,如果返回true说明找到了,找到后会把connection放到transmitter里面,如果连接池里面都没获取到连接,则创建连接,创建完了之后准备TCP连接,接着又从连接池中再次获取一次,如果再获取不成功,就把创建的连接放到连接池里面。 这里面涉及到okhttp的代理和路由的获取,这块笔者也不是很懂,所以不在这里叙述

7.RealConnectionPool#transmitterAcquirePooledConnection

上面我们知道,通过connectionPool.transmitterAcquirePooledConnection获取连接,通过connectionPool.put往连接池中放连接,我们先来看如何获取有效的连接的:

boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
    @Nullable List<Route> routes, boolean requireMultiplexed) { 
  //connections是一个Deque的队列,也就是双端队列
  for (RealConnection connection : connections) {
    //默认requireMultiplexed为false
    if (requireMultiplexed && !connection.isMultiplexed()) continue;
    //判断当前连接是不是合格的
    if (!connection.isEligible(address, routes)) continue;
    //给transmitter的connection对象赋值
    //向connection的transmitters中添加transmitter对象
    transmitter.acquireConnectionNoEvents(connection);
    return true;
  }
  return false;
}

okhttp中连接池存储空间是一个队列,并且该队列是一个ArrayDeque,它是一个双端队列:

private final Deque<RealConnection> connections = new ArrayDeque<>();

8.RealConnection#isEligible

如果连接不是合格的则直接跳过该连接,接着操作了transmitteracquireConnectionNoEvents方法,我们分别看isEligible(合格判断)acquireConnectionNoEvents方法:

boolean isEligible(Address address, @Nullable List<Route> routes) {

  // 如果url的host和当前连接的路由中的地址host相同则认为是合格的,说白了就是比较两个连接的host是否相同
  //所以连接池复用的连接是以host相同为合格的
  if (address.url().host().equals(this.route().address().url().host())) {
    return true; 
  }
  return true;
}

关于是否合格的判断我去掉了其他的判断,只留下了host比较的代码,如果两个连接host相同,则认为是要找的连接,否则就不是。

9.Transmitter#acquireConnectionNoEvents

void acquireConnectionNoEvents(RealConnection connection) {
  //把当前的连接给Transmitter中
  this.connection = connection;
  //给当前的连接的transmitters添加一个transmitter,注意这里是通过弱引用来包装的,注意该集合会在后面clean的时候有用到
  connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}

这里给当前Transmitter的connection附上当前的connection,因为后面要返回它给result。接着给当前的连接的transmitters集合添加一个弱引用的transmitters对象,在开篇已经说了 realConnection的transmitters集合的个数是在clean的时候用来判断该realConnection有没有正在被用到。

如果上面条件都满足,则transmitterAcquirePooledConnection返回true,表示从连接池中寻找realConnection成功了。

10.RealConnectionPool#put

说完了从连接池中查找过程,我们接着来到连接池存储realConnection过程,上面已经分析了存储是在RealConnectionPool的put方法:

void put(RealConnection connection) {
  assert (Thread.holdsLock(this));
  //如果没有正在清除的工作
  if (!cleanupRunning) {
    cleanupRunning = true;
    //这就是开篇说的清除realConnection的线程池
    executor.execute(cleanupRunnable);
  }
  //清除完了后,把连接加入到队列中
  connections.add(connection);
}

11.RealConnectionPool#cleanupRunnable

put过程很简单,先是做清除工作,然后做添加操作,我们着重看下清除怎么实现的,直接看cleanupRunnable的定义:

private final Runnable cleanupRunnable = () -> {
  //开启了死循环进行删除操作
  while (true) {
    //真正实现清除的方法,并且返回要等多久才能进行下次的清除
    long waitNanos = cleanup(System.nanoTime());
    //如果返回-1说明没有要清除的realConnection
    if (waitNanos == -1) return;
    if (waitNanos > 0) {
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      synchronized (RealConnectionPool.this) {
        try {
          //这里让当前线程直接在同步代码块里面实现等待,等待时间是waitMillis然后继续执行清除操作
          RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
        } catch (InterruptedException ignored) {
        }
      }
    }
  }
};

开启死循环进行删除操作,删除操作是在cleanup中,该方法会返回要等待的时间才能执行下次的清除操作,如果返回-1说明没有要清除的realConnection了,通过同步代码块指定该线程要等待多少秒才能继续下次清除操作。

12.RealConnectionPool#cleanup

ok,我们把重心放在cleanup方法中:

long cleanup(long now) {
  int inUseConnectionCount = 0;
  int idleConnectionCount = 0;
  RealConnection longestIdleConnection = null;
  long longestIdleDurationNs = Long.MIN_VALUE;
  synchronized (this) {
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext()如果你是一个 Java 面试官,你会问哪些问题?

rm搭配grep删除符合条件的文件

你应该问面试官的10个相关问题

java面试,面试官最后说,你还有什么要问的吗?我该怎么回答?

面试官之问:知道你的接口“QPS”是多少吗? 怎么办

面试官问你多线程你该怎么回答