ConnectInterceptor 解析——OkHttp 源码详解
Posted 薛瑄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConnectInterceptor 解析——OkHttp 源码详解相关的知识,希望对你有一定的参考价值。
前言
框架和流程——OkHttp 源码详解(一)
ConnectInterceptor 解析——OkHttp 源码详解(二)
OkHttp 主要的逻辑实现都在拦截器,拦截器中的重点,就是与服务端建立连接,这个过程包括了,dns,代理选择,tls连接,socket连接等,下面我们就从源码一步步分析
源码分析
下面从拦截器ConnectInterceptor 开始分析:
代码一
代码一:
/**
* Opens a connection to the target server and proceeds to the next interceptor. The network might
* be used for the returned response, or to validate a cached response with a conditional GET.
*/
object ConnectInterceptor : Interceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response
//看过上篇文章的,应该很熟悉了,每一个Interceptor 都会被包裹为RealInterceptorChain,注意,这里的chain,是下一个Interceptor
val realChain = chain as RealInterceptorChain
//这里的call,就是RealCall,下面对initExchange 展开进行描述,见代码二
val exchange = realChain.call.initExchange(chain)
//使用参数,对下一个Interceptor 进行拷贝
val connectedChain = realChain.copy(exchange = exchange)
//执行下一个Interceptor
return connectedChain.proceed(realChain.request)
代码二
代码二:
在 RealCall.kt 中
/** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange
synchronized(this)
check(expectMoreExchanges) "released"
check(!responseBodyOpen)
check(!requestBodyOpen)
//exchangeFinder 对象是在RetryAndFollowUpInterceptor中创建的,见代码三
val exchangeFinder = this.exchangeFinder!!
//find 创建连接或者在连接池里找到一个可用链接
// codec ExchangeCodec 接口,它是一个socket 链接,用来传输request和response,
//有两种实现:Http1ExchangeCodec ,Http2ExchangeCodec
val codec = exchangeFinder.find(client, chain)
//创建了一个Exchange,它负责单个http 请求和响应对,真正的实现是在codec 中(实际的IO操作)
val result = Exchange(this, eventListener, exchangeFinder, codec)
//保存这个Exchange 对象,在RealCall对象的变量中
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this)
this.requestBodyOpen = true
this.responseBodyOpen = true
if (canceled) throw IOException("Canceled")
return result
因为本篇文章,主要是研究,链接建立的过程,所以关于Exchange传输数据的就先不展开,下面来看一个 ExchangeFinder是在哪里 创建的
代码三
在RetryAndFollowUpInterceptor 的intercept 中,调用了下面的函数,为RealCall 对象,创建了一个ExchangeFinder
代码三:
在 RealCall.kt 中
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean)
check(interceptorScopedExchange == null)
synchronized(this)
check(!responseBodyOpen)
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
check(!requestBodyOpen)
if (newExchangeFinder)
//创建一个ExchangeFinder,在它里面实现了Dns,代理选择,路由选择,tls连接,socket连接
//具体每个功能的实现,也是有不同的类去完成,下面会一一详解RetryAndFollowUpInterceptor
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
下面就看看代码二中的exchangeFinder.find(client, chain)
函数,看看链接建立的详细过程
代码四
代码四:
在ExchangeFinder.kt 中
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec
try
// 下面到findHealthyConnection函数中一看究竟
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//resultConnection 是 RealConnection 类型,它负责建立socket链接 ,建立tls链接,建立Tunnel(通过http代理,访问https 服务器)等操作
//面对http1和http2,又有各自不同的传输方式,所以使用newCodec() 抽象为ExchangeCodec,http1和http2 实现不同的接口功能(策略者模式)
return resultConnection.newCodec(client, chain)
catch (e: RouteException)
trackFailure(e.lastConnectException)
throw e
catch (e: IOException)
trackFailure(e)
throw RouteException(e)
下面进入findHealthyConnection 函数中
代码五
代码五:
在ExchangeFinder.kt 中
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection
//这里是一个死循环,退出循环的两种情况:1、得到一个健康的连接 2、抛出一个异常
while (true)
//得到一个连接,新建或者在连接池中,见代码六
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks))
return candidate
// 如果连接,不可用,就从连接池中删除
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
下面涉及的类比较多,来几张图,从总体的认识一下
RouteSelector 是用来根据代理创建Route, 然后封装到Selection
Selection 针对某一个代理的 一系列的Route,切换代理后,该对象也会跟着改变
Route 一个Url可能会对应多个IP地址(DNS负载均衡),每个socket 对应一个Route 对象
Address 包含 一个Url,指定的Proxy(可为空), ProxySelector (可能会有许多Proxy)
这几个类的关系图
下图是,建立连接的流程图:
代码六
代码六:
在ExchangeFinder.kt 类中
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*
* This checks for cancellation before each blocking operation.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection
//如果当前的请求已经取消,就抛出异常
if (call.isCanceled()) throw IOException("Canceled")
//下面是使用RealCall 对象当前的连接
// Attempt to reuse the connection from the call.
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null)
var toClose: Socket? = null
synchronized(callConnection)
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url))
toClose = call.releaseConnectionNoEvents()
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null)
check(toClose == null)
return callConnection
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
//2、上面RealCall 对象的连接不可用,We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
//callAcquirePooledConnection 函数,下面会详细分析
if (connectionPool.callAcquirePooledConnection(address, call, null, false))
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
// 在连接池中也没有找到可用的,就去找Route,然后用Route创建一个连接
// 寻找Route 的步骤:
//1、先看nextRouteToTry 有没有值,(如果有值,就是上一次循环赋值的),拿出来使用
//2、nextRouteToTry 为空,去Selection 中去找(Selection 就是一些列Route的集合)
//3、如果Selection 中也没有,就去RouteSelector中,先找一个Selection,再在其中找Route
// 如果是第一次进来,那么会到第三种情况,先创建一个RouteSelector,next() 后Selection,Route 就都有了
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null)
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
else if (routeSelection != null && routeSelection!!.hasNext())
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
else
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null)
//代码七 会详细介绍RouteSelector
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
//注意这里参数,有传入routes 参数,
if (connectionPool.callAcquirePooledConnection(address, call, routes, false))
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
route = localRouteSelection.next()
// 代码如果执行到这里,就说明还没有找到可用连接,但是找到了Route
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try
// 建立socket 连接,建立tls连接,建立tunnel
// 见代码十:
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
finally
call.connectionToCancel = null
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
// 最后一个参数是true,查找是否有多路复用(http2)的可用链接,如果有的话,就使用,并刚刚新创建的链接
if (connectionPool.callAcquirePooledConnection(address, call, routes, true))
val result = call.connection!!
//把上面找到的route 保存起来,如果当前返回的连接不健康,下次就尝试这个route
nextRouteToTry = route
newConnection.socket().closeQuietly()
//调用事件监听函数
eventListener.connectionAcquired(call, result)
return result
synchronized(newConnection)
//放入连接池中
connectionPool.put(newConnection)
//见代码九分析
call.acquireConnectionNoEvents(newConnection)
//调用事件监听函数
eventListener.connectionAcquired(call, newConnection)
return newConnection
下面来详细分析一下RouteSelector、然后在代码八分析 callAcquirePooledConnection 是如何在连接池中选择一个连接的。
来看看RouteSelector 类的完整代码
代码七
代码七:
在RouteSelector.kt 类中
/**
* Selects routes to connect to an origin server. Each connection requires a choice of proxy server,
* IP address, and TLS mode. Connections may also be recycled.
*/
class RouteSelector(
//在RealCall 中创建的address 对象,存放url,port,proxy,proxySelector(proxy 为空时 使用的)
private val address: Address,
private val routeDatabase: RouteDatabase,
private val call: Call,
private val eventListener: EventListener
)
/* State for negotiating the next proxy to use. */
private var proxies = emptyList<Proxy>()
private var nextProxyIndex: Int = 0
/* State for negotiating the next socket address to use. */
private var inetSocketAddresses = emptyList<InetSocketAddress>()
/* State for negotiating failed routes */
private val postponedRoutes = mutableListOf<Route>()
init
//首先会调用这里
resetNextProxy(address.url, address.proxy)
// 把设置的代理都找出来,如果没有设置代理,则使用proxySelector,
private fun resetNextProxy(url: HttpUrl, proxy: Proxy?)
fun selectProxies(): List<Proxy>
// If the user specifies a proxy, try that and only that.
if (proxy != null) return listOf(proxy)
// If the URI lacks a host (as in "http://</"), don't call the ProxySelector.
val uri = url.toUri()
if (uri.host == null) return immutableListOf(Proxy.NO_PROXY)
// Try each of the ProxySelector choices until one connection succeeds.
//注意这里的proxySelector( 类型是ProxySelector抽象类),对proxySelector 赋值 ,默认通过反射找到"sun.net.spi.DefaultProxySelector",如果为空,则使用NullProxySelector
val proxiesOrNull = address.proxySelector.select(uri)
//如果没有找到代理,就返回一个list,其中Proxy 的参数是直接相连,也就是没有代理
if (proxiesOrNull.isNullOrEmpty()) return immutableListOf(Proxy.NO_PROXY)
return proxiesOrNull.toImmutableList()
eventListener.proxySelectStart(call, url)
proxies = selectProxies()
//这里赋初值,后面会根据这个字段来判断,是否还有代理
nextProxyIndex = 0
eventListener.proxySelectEnd(call, url, proxies)
/**
* Returns true if there's another set of routes to attempt. Every address has at least one route.
*/
operator fun hasNext(): Boolean = hasNextProxy() || postponedRoutes.isNotEmpty()
/** Returns true if there's another proxy to try. */
private fun hasNextProxy(): Boolean = nextProxyIndex < proxies.size
//代码六中,调用了这里,来创建一个Selection(Route 的集合)
@Throws(IOException::class)
operator fun next(): Selection
if (!hasNext()) throw NoSuchElementException()
// Compute the next set of routes to attempt.
val routes = mutableListOf<Route>()
while (hasNextProxy())
//即使没有代理,proxies也会有一个Proxy,此时nextProxyIndex == 0,所以hasNextProxy()为true,进入到这里
// Postponed routes are always tried last. For example, if we have 2 proxies and all the
// routes for proxy1 should be postponed, we'll move to proxy2. Only after we've exhausted
// all the good routes will we attempt the postponed routes.
val proxy = nextProxy()
for (inetSocketAddress in inetSocketAddresses)
val route = Route(address, proxy, inetSocketAddress)
if (routeDatabase.shouldPostpone(route))
//如果这个路由之前失败过,就加入到postponedRoutes 中
postponedRoutes += route
else
// 直接把route 加入routes
routes += route
//如果有没有失败过得route,就终止循环
if (routes.isNotEmpty())
break
//所以的route都是失败过的,就把刚才postponedRoutes 加入到routes
if (routes.isEmpty())
// We've exhausted all Proxies so fallback to the postponed routes.
// 把postponedRoutes
routes += postponedRoutes
postponedRoutes.clear()
//用routes 创建Selection 对象
return Selection(routes)
/** Returns the next proxy to try. May be PROXY.NO_PROXY but never null. */
@Throws(IOException::class)
private fun nextProxy(): Proxy
if (!hasNextProxy())
throw SocketException(
"No route to $address.url.host; exhausted proxy configurations: $proxies")
val result = proxies[nextProxyIndex++]
resetNextInetSocketAddress(result)
return result
/** Prepares the socket addresses to attempt for the current proxy or host. */
@Throws(IOException::class)
private fun resetNextInetSocketAddress(proxy: Proxy)
// Clear the addresses. Necessary if getAllByName() below throws!
val mutableInetSocketAddresses = mutableListOf<InetSocketAddress>()
inetSocketAddresses = mutableInetSocketAddresses
val socketHost: String
val socketPort: Int
if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS)
//如果是直接相连,或者是SOCKS 直接通过ip 和端口,访问后台
socketHost = address.url.host
socketPort = address.url.port
else
// http 系列的代理
val proxyAddress = proxy.address()
require(proxyAddress is InetSocketAddress)
"Proxy.address() is not an InetSocketAddress: $proxyAddress.javaClass"
socketHost = proxyAddress.socketHost
socketPort = proxyAddress.port
if (socketPort !in 1..65535)
throw SocketException(以上是关于ConnectInterceptor 解析——OkHttp 源码详解的主要内容,如果未能解决你的问题,请参考以下文章
ConnectInterceptor 解析——OkHttp 源码详解
探究OkHttpClient的运行原理(5---ConnectInterceptor)
OkHttp-CallServerInterceptor源码解析
OkHttp-CallServerInterceptor源码解析