面试官:“Okhttp连接池是咋实现“?你:该咋回答呢?
Posted 冬天的毛毛雨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面试官:“Okhttp连接池是咋实现“?你:该咋回答呢?相关的知识,希望对你有一定的参考价值。
背景
最近把Okhttp的源码又整理了下,之前也写过Okhttp源码的文章,我觉得那会对Okhttp的认识不够深入,所以这次还是像炒咸饭一样吗?no-no-no,这次我会整理点精华部分,让大家学习点东西,如标题所示,这次要讨论的话题是Okhttp的连接池怎么工作的,以及它工作的原理,为什么要整理这篇文章呢,因为okhttp的连接池在面试过程中很大可能被问到,因此在这里总结出来,供大家参考。
为了大家更好的理解发起同步和异步的过程,画了张草图给大家,如果有不正确的地方望指出:
- 我们知道Okhttp中通过okhttpClient对象是通过Builder对象初始化出来的,此处Builder的用法是建造者模式,建造者模式主要是分离出外部类的属性初始化,而初始化属性交给了内部类Buidler类,这么做的好处是外部类不用关心属性的初始化。 而在初始化的时候有
interceptors
、networkInterceptors
两种拦截器的初始化,还有dispatcher(分发器)
的初始化,以及后面需要讲到的cache(缓存)
初始化等。- 初始化完了后通过builder的build方法构造出
okhttpClient
对象,该类被称作客户端类,通过它的newCall方法返回RealCall对象,在newCall过程的过程中需要request的信息,request信息包装了url、method、headers、body等信息。最后通过RealCall的同步或异步方法交给了okhttpClient
的dispatcher
来处理,在处理同步或异步之前都会判断有没有正在executed
,所以我们不能对同一个RealCall调用异步或同步方法。- 在异步的时候会把RealCall给包装成一个
AsyncCall
,它是一个runnable对象。接着就来到了分发器异步处理部分,首先会把AsyncCall
加入到readyAsyncCalls
的集合中,该集合表示准备阶段的请求集合,紧接着从runningAsyncCalls(该集合装的都是要即将请求的集合)
和readyAsyncCalls
集合中找相同host的AsyncCall
,如果找到了会把当中记录的相同host的个数给该AsyncCall
。注意这里保存host个数用的原子性的AtomicInteger来记录的- 接着会去判断最大的请求是否大于64以及相同host是否大于5个,这里也是okhttp面试高频知识点,如果都通过的话,会把当前的
AsyncCall
的相同host记录数加一,接着会加入到runningAsyncCalls
集合中,接着循环遍历刚符合条件的AsyncCall
,通过线程池去执行AsyncCall
,注意此处的线程池的配置是没有核心线程,总的线程个数是没有限制的,也就是说都是非核心线程,并且个数没有限制,非核心线程等待的时间是60秒,并且使用的任务队列是SynchronousQueue,它是一个没有容量的阻塞队列,只会当里面没有任务的时候,才能往里面放任务,当放完之后,只能等它的任务被取走才能放,这不就是jdk里面提供的Executors.newCachedThreadPool线程池吗,可能是okhttp想自己定义线程工厂的参数吧,定义线程的名字。- 所以到这里才会进入到子线程,由于
AsyncCall
是一个runnable,因此最终执行来到了它的run方法吧,run方法最终会走到execute
方法,该方法来到了okhttp最有意思的单链表结构的拦截器部分,它会把所有的拦截器组装成一个集合,然后传给RealInterceptorChain
的process
方法,在该方法中,会先把下一个RealInterceptorChain
初始化出来,然后把下一个RealInterceptorChain
传给当前Interceptor的intercept
方法,最终一个个的response返回到AsyncCall
的execute
方法。- 处理完当前的
AsyncCall
后,会交给dispatcher
,它会将该AsyncCall
的host数减一,并且把它从runningAsyncCalls
集合中移除,接着再从readyAsyncCalls
集合中拿剩下的AsyncCall
继续执行,直到执行完readyAsyncCalls
里面的AsyncCall
。
关于okhttp发起异步和同步请求可以看这里OkHttp 源码解析
或者看我之前分析的okhttpOkHttp大流程分析
这就是整个okhttp的执行流程,而最重要的是拦截器部分,我会在这章介绍拦截器的连接池部分,先通过源码的形式介绍它的来龙去脉:
连接池的意义
- 频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)是非常消耗网络资源和浪费时间的,HTTP中的keepalive连接对于 降低延迟和提升速度有非常重要的作用。
- 复用连接就需要对连接进行管理,这里就引入了连接池的概念。
- Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),连接池有ConectionPool实现,对连接进行回收和管理。
概要
连接池部分主要是在RealConnectionPool
类中,该类用connections(双端队列)
存储所有的连接,cleanupRunnable
是专门用来清除超时的RealConnection
,既然有清除的任务,那肯定有清除的线程池,没错,该线程池(executor)
跟okhttp处理异步时候的线程池是一样的,keepAliveDurationNs
表示每一个连接keep-alive的时间,默认是5分钟,maxIdleConnections
连接池的最大容量,默认是5个。RealConnection
中有transmitters
字段,用来保存该连接的transmitter个数,通过里面的transmitter个数来标记该RealConnection
有没有在使用中。
注:如果大家只是想关心面试过程中怎么针对面试官问连接池的问题,可以直接看文章结尾哟
源码分析
1.RealCall#getResponseWithInterceptorChain
拦截器的组装是在RealCall的getResponseWithInterceptorChain
方法中放到集合里面了:
Response getResponseWithInterceptorChain() throws IOException {
//所有拦截器的组装集合
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接池的使用在这里面
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
try {
Response response = chain.proceed(originalRequest);
return response;
} catch (IOException e) {
} finally {
}
}
}
2.ConnectInterceptor#intercept
连接池在ConnectInterceptor
中包装起来了,我们进去瞄一眼:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//拿到chain的Transmitter,里面包装了RealCall、okhttpClient、connectionPool等信息
Transmitter transmitter = realChain.transmitter();
//是否不是get请求
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
3.Transmitter#newExchange
拿到chain的Transmitter
对象,该对象是RealCall
、okhttpClient
、connectionPool
等信息的包装类,将是否不是get请求的标识传给了transmitter
的newExchange
方法:
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
//exchangeFinder是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
synchronized (connectionPool) {
this.exchange = result;
return result;
}
}
4.ExchangeCodec#find
该方法中将ExchangeCodec
的获取交给了exchangeFinder
对象,ExchangeCodec
是一个接口,实现类有Http2ExchangeCodec
和Http1ExchangeCodec
,这两个类表示http1和http2的建立连接的类,里面实现了writeRequestHeaders
和createRequestBody
等方法,这两个方法是在CallServerInterceptor
拦截器中使用的。exchangeFinder
是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的。我们接着看exchangeFinder
的find方法:
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
} catch (IOException e) {
}
}
5.ExchangeCodec#findHealthyConnection
这个方法没什么好说的,就一句,看findHealthyConnection
方法:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
//开启循环找符合条件的RealConnection
while (true) {
//找当前keep-alive有效时间内的连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
//如果该连接没有出现过连接失败才会返回,连接失败的标识是在Exchange中处理的,而Exchange中的处理是交给了CallServerInterceptor的
if (candidate.successCount == 0) {
return candidate;
}
}
//如果socket连接断开了或者失败了也认为不是有效的连接
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
6.ExchangeCodec#findConnection
该方法会开启循环来找符合keep-alive有效时间内的连接,紧接着判断它是否在CallServerInterceptor拦截器处理过程中出现连接失败或者是该连接的socket连接断开了也认为不是有效的连接。我们主要看findConnection怎么处理keep-alive
的有效时间的连接:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
//定义最终找到的那个RealConnection
RealConnection result = null;
Route selectedRoute = null;
//用来标识有没有分配的连接
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
//获取当前分配的连接的路由信息
Route previousRoute = retryCurrentRoute()
? transmitter.connection.route()
: null;
//获取有没有分配过的连接
releasedConnection = transmitter.connection;
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
//看当前正在分配的连接有没有
if (transmitter.connection != null) {
result = transmitter.connection;
releasedConnection = null;
}
//如果上面都没找到从连接池中找连接
if (result == null) {
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else {
selectedRoute = previousRoute;
}
}
}
closeQuietly(toClose);
//有的话直接返回
if (result != null) {
return result;
}
//检查有没有新的路由
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
//如果有新的路由信息
if (newRouteSelection) {
routes = routeSelection.getAll();
//再次从连接池中找
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
// 没有从连接池中找到连接
if (!foundPooledConnection) {
//创建连接
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// 从连接池中找到返回连接
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 准别TCP连接
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
//继续从连接池中找一次看有没有符合条件的连接池
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
} else {
//如果不符合条件则把当前连接放到连接池里面
connectionPool.put(result);
}
}
closeQuietly(socket);
return result;
}
首先会先判断有没有被分配的连接,如果没有,从连接池中找符合条件的连接,通过connectionPool.transmitterAcquirePooledConnection
去找,如果返回true说明找到了,找到后会把connection放到transmitter里面,如果连接池里面都没获取到连接,则创建连接,创建完了之后准备TCP连接,接着又从连接池中再次获取一次,如果再获取不成功,就把创建的连接放到连接池里面。 这里面涉及到okhttp的代理和路由的获取,这块笔者也不是很懂,所以不在这里叙述
7.RealConnectionPool#transmitterAcquirePooledConnection
上面我们知道,通过connectionPool.transmitterAcquirePooledConnection
获取连接,通过connectionPool.put
往连接池中放连接,我们先来看如何获取有效的连接的:
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
//connections是一个Deque的队列,也就是双端队列
for (RealConnection connection : connections) {
//默认requireMultiplexed为false
if (requireMultiplexed && !connection.isMultiplexed()) continue;
//判断当前连接是不是合格的
if (!connection.isEligible(address, routes)) continue;
//给transmitter的connection对象赋值
//向connection的transmitters中添加transmitter对象
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
okhttp中连接池存储空间是一个队列,并且该队列是一个ArrayDeque,它是一个双端队列:
private final Deque<RealConnection> connections = new ArrayDeque<>();
8.RealConnection#isEligible
如果连接不是合格的则直接跳过该连接,接着操作了transmitter
的acquireConnectionNoEvents
方法,我们分别看isEligible(合格判断)
和acquireConnectionNoEvents
方法:
boolean isEligible(Address address, @Nullable List<Route> routes) {
// 如果url的host和当前连接的路由中的地址host相同则认为是合格的,说白了就是比较两个连接的host是否相同
//所以连接池复用的连接是以host相同为合格的
if (address.url().host().equals(this.route().address().url().host())) {
return true;
}
return true;
}
关于是否合格的判断我去掉了其他的判断,只留下了host比较的代码,如果两个连接host相同,则认为是要找的连接,否则就不是。
9.Transmitter#acquireConnectionNoEvents
void acquireConnectionNoEvents(RealConnection connection) {
//把当前的连接给Transmitter中
this.connection = connection;
//给当前的连接的transmitters添加一个transmitter,注意这里是通过弱引用来包装的,注意该集合会在后面clean的时候有用到
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}
这里给当前Transmitter的connection附上当前的connection,因为后面要返回它给result。接着给当前的连接的transmitters集合添加一个弱引用的transmitters对象,在开篇已经说了 realConnection的transmitters集合的个数是在clean的时候用来判断该realConnection有没有正在被用到。
如果上面条件都满足,则transmitterAcquirePooledConnection
返回true,表示从连接池中寻找realConnection成功了。
10.RealConnectionPool#put
说完了从连接池中查找过程,我们接着来到连接池存储realConnection过程,上面已经分析了存储是在RealConnectionPool
的put方法:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//如果没有正在清除的工作
if (!cleanupRunning) {
cleanupRunning = true;
//这就是开篇说的清除realConnection的线程池
executor.execute(cleanupRunnable);
}
//清除完了后,把连接加入到队列中
connections.add(connection);
}
11.RealConnectionPool#cleanupRunnable
put过程很简单,先是做清除工作,然后做添加操作,我们着重看下清除怎么实现的,直接看cleanupRunnable
的定义:
private final Runnable cleanupRunnable = () -> {
//开启了死循环进行删除操作
while (true) {
//真正实现清除的方法,并且返回要等多久才能进行下次的清除
long waitNanos = cleanup(System.nanoTime());
//如果返回-1说明没有要清除的realConnection
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
//这里让当前线程直接在同步代码块里面实现等待,等待时间是waitMillis然后继续执行清除操作
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
开启死循环进行删除操作,删除操作是在cleanup
中,该方法会返回要等待的时间才能执行下次的清除操作,如果返回-1说明没有要清除的realConnection了,通过同步代码块指定该线程要等待多少秒才能继续下次清除操作。
12.RealConnectionPool#cleanup
ok,我们把重心放在cleanup方法中:
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext()如果你是一个 Java 面试官,你会问哪些问题?