OKHttp源码分析
Posted 且听真言
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了OKHttp源码分析相关的知识,希望对你有一定的参考价值。
OKHttp作为常用的网络框架,Okhttp是由square公司开发。掌握OKHttp的用法以及它的内部工作原理还是很有必要的。
一、引入方式
1.gradle引入
implementation 'com.squareup.okhttp3:okhttp:3.14.7'
implementation 'com.squareup.okio:okio:1.17.5'
2.权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
二、使用方式
步骤:
- 构建OkHttpClient对象。
- 构建Request对象。
- 生成Call对象。
- Call发起同步、异步请求。
2.1 Get请求
1.异步请求
val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.get().build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
2.同步请求
val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.get().build()
val newCall = okHttpClient.newCall(request)
Thread()
try
val execute = newCall.execute()
Log.v(OKHTTP_TAG, execute.toString())
catch (e: Exception)
e.printStackTrace()
.start()
2.2 Post请求
下面的例子是:
1.post请求提交String。
val okHttpClient = OkHttpClient()
val contentType = MediaType.parse("text/x-markdown; charset=utf-8")
val content = "Hello"
val body = RequestBody.create(contentType, content)
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/chapters/json")
.post(body)
.build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
2.post请求提交表单。
val okHttpClient = OkHttpClient()
val formBody = FormBody.Builder().add("username", "zy")
.add("password", "123")
.build()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.post(formBody)
.build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
3.post请求提交复杂请求体
val okHttpClient = OkHttpClient()
//image file
val imageFile = File(Environment.getExternalStorageDirectory(), "test_ic.png")
//通过RequestBody构建fileBody
val fileBody = RequestBody.create(MediaType.parse("image/jpg"), imageFile)
//MultipartBody构建多种类型(用户名、密码、头像)
val multipartBody = MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("username", "zy")
.addFormDataPart("phone", "123456")
.addFormDataPart("Test", "test_ic.png", fileBody)
.build()
val getRequest = Request.Builder()
.url("https://wanandroid.com/wxarticle/chapters/json")
.post(multipartBody)
.build()
val newCall = okHttpClient.newCall(getRequest)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
2.3 请求配置项
下面的例子中设置了请求的超时时长、缓存位置以及大小、监控OKHttp发出的请求(全局配置)
val okHttpClient = OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.cache(Cache(externalCacheDir, 500 * 1024 * 1024))
.addInterceptor
val request = it.request()
val url = request.url().toString()
Log.v(
OKHTTP_TAG,
"intercept:proceed start: url" + url + ", at " + System.currentTimeMillis()
)
val response = it.proceed(request)
val body = response.body()
Log.v(
OKHTTP_TAG,
"intercept:proceed end: url" + url + ", at " + System.currentTimeMillis()
)
Log.v(OKHTTP_TAG, body?.toString() ?: "")
response
.build()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.get().build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
请求单独的配置
下面的例子中:
使用addHeader()方法添加了请求头。
使用cacheControl(CacheControl.FORCE_NETWORK)设置此次请求是能使用网络,不用缓存。
val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.get()
.addHeader("key", "value")
.cacheControl(CacheControl.FORCE_CACHE)
.build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object :Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
三、OKHTTP请求流程
通过一个简单的异步请求分析下OKHttp的请求流程。
val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
.get().build()
val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback
override fun onFailure(call: Call, e: IOException)
e.printStackTrace()
override fun onResponse(call: Call, response: Response)
Log.v(OKHTTP_TAG, response.toString())
Log.v(OKHTTP_TAG, Thread.currentThread().name)
)
1.请求的创建
1.1OKHttpClient创建
通过OkHttpClient构造函数构建实例,配置项就是Builder构造方法中默认值。
OKHttpClient.Builder的构造函数
public Builder()
dispatcher = new Dispatcher();//分发器,主要用来执行异步请求时的策略
protocols = DEFAULT_PROTOCOLS;//http协议
connectionSpecs = DEFAULT_CONNECTION_SPECS;//连接配置
eventListenerFactory = EventListener.factory(EventListener.NONE);//请求监听工厂
proxySelector = ProxySelector.getDefault();//代理选择器
if (proxySelector == null)
proxySelector = new NullProxySelector();
cookieJar = CookieJar.NO_COOKIES;//cookie
socketFactory = SocketFactory.getDefault();//Socket工厂
hostnameVerifier = OkHostnameVerifier.INSTANCE;//主机name验证
certificatePinner = CertificatePinner.DEFAULT;//证书链
proxyAuthenticator = Authenticator.NONE;//代理服务器身份验证
authenticator = Authenticator.NONE;//源服务器身份验证
connectionPool = new ConnectionPool();//连接池
dns = Dns.SYSTEM;//dns域名
followSslRedirects = true;//是否遵循 ssl 重定向
followRedirects = true;//是否遵循重定向
retryOnConnectionFailure = true;//连接失败的时候是否重试
callTimeout = 0;//请求超时
connectTimeout = 10_000;//连接超时
readTimeout = 10_000;//读超时
writeTimeout = 10_000;//写超时
pingInterval = 0;//ping间隔时间
1.2Request实例创建
Request也同样是使用Builder模式创建
// 请求链接
HttpUrl url;
// 请求方法
String method;
// 请求头
Headers.Builder headers;
// 请求体
RequestBody body;
// 标签
Map<Class<?>, Object> tags = Collections.emptyMap();
1.3 Call对象的创建
public interface Call extends Cloneable
//请求
Request request();
//同步请求
Response execute() throws IOException;
//异步请求
void enqueue(Callback responseCallback);
//取消请求
void cancel();
//是否在请求过程中
boolean isExecuted();
//请求是否取消
boolean isCanceled();
Timeout timeout();
Call clone();
//工厂接口
interface Factory
Call newCall(Request request);
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket)
//OKHttpClient 实例
this.client = client;
//最初的Request
this.originalRequest = originalRequest;
//是否支持websocket通信
this.forWebSocket = forWebSocket;
Transmitter内部持有OkHttpClient、连接池、call、事件监听器。
public Transmitter(OkHttpClient client, Call call)
this.client = client; //OKHttpClient
//连接池
this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
//Call实例
this.call = call;
//事件监听
this.eventListener = client.eventListenerFactory().create(call);
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
2.请求调度
2.1异步请求
@Override public void enqueue(Callback responseCallback)
synchronized (this)
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
transmitter.callStart();//回调 请求监听器的开始
client.dispatcher().enqueue(new AsyncCall(responseCallback));//开始调度
首先判断是否已请求过,回调请求开始。然后调用Dispatcher的enqueue方法,传入参数AsyncCall。AsynCall本质是一个Runnable。
final class AsyncCall extends NamedRunnable
... ...
public abstract class NamedRunnable implements Runnable
protected final String name;
public NamedRunnable(String format, Object... args)
this.name = Util.format(format, args);
@Override public final void run()
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try
execute();
finally
Thread.currentThread().setName(oldName);
protected abstract void execute();
#Dispatcher
请求放入双端队列readyAsyncCalls中,再从 正在执行的请求runningAsyncCalls 或 等待执行的请求readyAsyncCalls 中找到是相同host的请求,把callsPerHost重用给当前请求。callsPerHost看名字感觉像是 拥有相同host的请求的数量,类型是AtomicInteger。
void enqueue(AsyncCall call)
synchronized (this)
//将AsyncCall加入等待执行的队列
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket)
//从 runningAsyncCalls或者readyAsyncCalls中找到相同host的请求
//相同的 host,使用相同的host calls计数器
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
promoteAndExecute();
//从 runningAsyncCalls或者readyAsyncCalls中找到相同host的请求
@Nullable private AsyncCall findExistingCallWithHost(String host)
for (AsyncCall existingCall : runningAsyncCalls)
if (existingCall.host().equals(host)) return existingCall;
for (AsyncCall existingCall : readyAsyncCalls)
if (existingCall.host().equals(host)) return existingCall;
return null;
#Dispatcher
private int maxRequests = 64; //最多同时请求数为64
private int maxRequestsPerHost = 5;//每个主机最多同时请求数为5
//注意使用的是双端队列
/** 待运行的异步任务队列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在运行的异步任务队列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在运行的同步任务队列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
private boolean promoteAndExecute()
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this)
//遍历等待执行的异步任务队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); )
//得到一个待执行的AsyncCall
AsyncCall asyncCall = i.next();
//当最大并发数小于64以及当前请求的主机的请求数小于5时,添加到处理中的异步请求队列
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
//该host下的calls计数器加1
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
isRunning = runningCallsCount() > 0;
for (int i = 0, size = executableCalls.size(); i < size; i++)
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());//执行请求
return isRunning;
遍历readyAsyncCalls,判断如果runningAsyncCalls数量大于最大并发请求数64就break,或者 相同host请求的数量大于5,就continue。如果不满足上面的两种情况,就从等待队列readyAsyncCalls中移除,callsPerHost自增1,放入 集合executableCalls中,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。readyAsyncCalls的价值在于控制最大并发数的缓冲:异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。
#RealCall
void executeOn(ExecutorService executorService)
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try
//通过传入的线程池执行AsyncCall
executorService.execute(this);
success = true;
catch (RejectedExecutionException e)
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);//回调失败
finally
if (!success)
client.dispatcher().finished(this); // This call is no longer running!
#Dispatcher
该线程池与CachedThreadPool的线程池 执行请求RealCall非常相似。
public synchronized ExecutorService executorService()
if (executorService == null)
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
return executorService;
#AsyncCall
@Override protected void execute()
boolean signalledCallback = false;
//超时计时
transmitter.timeoutEnter();
try
//获取请求Response
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
//回调请求结果
responseCallback.onResponse(RealCall.this, response);
catch (IOException e)
if (signalledCallback)
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
else
//走失败回调
responseCallback.onFailure(RealCall.this, e);
catch (Throwable t)
cancel();
if (!signalledCallback)
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
// 走失败回调
responseCallback.onFailure(RealCall.this, canceledException);
throw t;
finally
//结束请求
client.dispatcher().finished(this);
调用getResponseWithInterceptorChain()方法来获取Response,使用responseCallback回调结果,最后请求结束调用了dispatcher的finish方法。
3.请求的执行
执行请求最终都会执行到RealCall的getResponseWithInterceptorChain()方法中
Response getResponseWithInterceptorChain() throws IOException
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//应用拦截器(用户自定义拦截器)
interceptors.addAll(client.interceptors());
//重试及重定向拦截器
interceptors.add(new RetryAndFollowUpInterceptor(client));
//桥接拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket)
//网络拦截器(用户自定义)
interceptors.addAll(client.networkInterceptors());
//请求服务拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
//拦截器链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled())
closeQuietly(response);
throw new IOException("Canceled");
return response;
catch (IOException e)
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
finally
if (!calledNoMoreExchanges)
transmitter.noMoreExchanges(null);
package okhttp3;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
public interface Interceptor
Response intercept(Chain chain) throws IOException;
interface Chain
Request request();
Response proceed(Request request) throws IOException;
@Nullable Connection connection();
Call call();
int connectTimeoutMillis();
Chain withConnectTimeout(int timeout, TimeUnit unit);
int readTimeoutMillis();
Chain withReadTimeout(int timeout, TimeUnit unit);
int writeTimeoutMillis();
Chain withWriteTimeout(int timeout, TimeUnit unit);
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
在实例化RealInterceptorChain时 index赋值是0,exchange是null,首先获取了第一个拦截器,调用了它的interceptor方法,并返回和校验了结果。
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
调用应用拦截器的interceptor方法传入的参数:拦截器链实例next,next就是把index + 1。在应用拦截器中调用的是 next的proceed方法。
最后一个拦截器CallServerInterceptor之外,所有拦截器的interceptor方法都调用了 传入RealInterceptorChain实例。每个拦截器interceptor方法在 调用chain的proceed方法拿到下一个拦截器处理的response,然后返回给上一个拦截器。
4.拦截器
所有的拦截器会组合成一个拦截器链,以典型的责任链模式,依次执行每个拦截器。不同的拦截器有不同的职责,链上的拦截器会按顺序挨个处理,在Request发出之前,Response返回之前,插入一些定制逻辑,这样可以方便的扩展需求。
#RealCall
Response getResponseWithInterceptorChain() throws IOException
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket)
interceptors.addAll(client.networkInterceptors());
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled())
closeQuietly(response);
throw new IOException("Canceled");
return response;
catch (IOException e)
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
finally
if (!calledNoMoreExchanges)
transmitter.noMoreExchanges(null);
1.RetryAndFollowUpInterceptor
@Override public Response intercept(Chain chain) throws IOException
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Transmitter transmitter = realChain.transmitter();
int followUpCount = 0;
Response priorResponse = null;
while (true)
//准备连接
transmitter.prepareToConnect(request);
if (transmitter.isCanceled())
throw new IOException("Canceled");
Response response;
boolean success = false;
try
//继续执行下一个Interceptor
response = realChain.proceed(request, transmitter, null);
success = true;
catch (RouteException e)
// The attempt to connect via a route failed. The request will not have been sent.
//连接路由异常,此时请求还未发送。
if (!recover(e.getLastConnectException(), transmitter, false, request))
throw e.getFirstConnectException();
continue;
catch (IOException e)
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
//IO异常,请求可能已经发出。
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
finally
// The network call threw an exception. Release any resources.
if (!success)
//请求没成功,释放资源。
transmitter.exchangeDoneDueToException();
// Attach the prior response if it exists. Such responses never have a body.
// 关联上一个response
if (priorResponse != null)
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
//跟进结果,主要作用是根据响应码处理请求,返回Request不为空时进行重定向处理-拿到重定向的request。
//获取响应码判断是否需要重定向
Request followUp = followUpRequest(response, route);
//followUp为空,不需要重试,直接返回
if (followUp == null)
if (exchange != null && exchange.isDuplex())
transmitter.timeoutEarlyExit();
return response;
RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot())
return response;
closeQuietly(response.body());
if (transmitter.hasExchange())
exchange.detachWithViolence();
//最多重试20次
if (++followUpCount > MAX_FOLLOW_UPS)
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
request = followUp;
priorResponse = response;
2.BridgeInterceptor
@Override public Response intercept(Chain chain) throws IOException
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null)
MediaType contentType = body.contentType();
if (contentType != null)
requestBuilder.header("Content-Type", contentType.toString());
long contentLength = body.contentLength();
//获取RequestBody的长度,添加Header "Content-Length"
if (contentLength != -1)
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
else
//否则添加"Transfer-Encoding"
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
if (userRequest.header("Host") == null)
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
if (userRequest.header("Connection") == null)
requestBuilder.header("Connection", "Keep-Alive");
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
//"Accept-Encoding: gzip",接收 返回gzip编码压缩的数据
//如果手动添加了"Accept-Encoding: gzip",则不会进入下面的if语句,transparentGzip为false,需要我们自己处理数据解压。
//如果 没有 手动添加“Accept-Encoding: gzip”,transparentGzip是true,就会自动添加,后续也会自动解压处理。
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null)
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
//从cookiejar中获取cookie,添加到Header。
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty())
requestBuilder.header("Cookie", cookieHeader(cookies));
//"User-Agent"需要作为公共Header外部统一添加
if (userRequest.header("User-Agent") == null)
requestBuilder.header("User-Agent", Version.userAgent());
//交给下一个拦截器去处理请求
Response networkResponse = chain.proceed(requestBuilder.build());
//从networkResponse中获取header "Set-Cookie" 存入 cookieJar
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
//如果没有手动添加“Accept-Encoding: gzip”,会自动创建 能自动解压的responseBody-GzipSource
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse))
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
//构建新的 Response 返回出去
return responseBuilder.build();
BridgeInterceptor拦截器,对请求添加了header:“Content-Type”、“Content-Length” 或 “Transfer-Encoding”、“Host”、“Connection”、“Accept-Encoding”、“Cookie”、“User-Agent”,即网络层真正可执行的请求。其中,默认是没有cookie处理的,需要我们在初始化OkhttpClient时配置我们自己的cookieJar。
获取响应后,先把响应header中的cookie存入cookieJar,然后如果没有手动添加请求header “Accept-Encoding: gzip”,那么会通过 创建能自动解压的responseBody——GzipSource,接着构建新的response返回。
3.CacheInterceptor
CacheInterceptor,缓存拦截器,提供网络请求缓存的存取。通过CacheInterceptor合理使用本地缓存,可以有效地减少网络开销、减少响应延迟。
1.了解下http缓存机制:
第一次请求:
第二次请求:
@Override public Response intercept(Chain chain) throws IOException
//用request的url 从缓存中 获取响应 作为候选(CacheStrategy决定是否使用)
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//根据 request 、 候选response 获取缓存策略
//缓存策略 将决定是否 使用缓存:strategy.networkRequest为 null,不使用网络;strategy.cacheResponse为null,不使用缓存。
//缓存策略 有两种类型
//networkRequest 网络请求
//cacheResponse 缓存的响应
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
//根据缓存策略更新统计指标:请求次数、网络请求次数、使用缓存次数
if (cache != null)
//计算请求次数 和 缓存次数
cache.trackResponse(strategy);
//有缓存 但不能用,关闭
if (cacheCandidate != null && cacheResponse == null)
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
// If we're forbidden from using the network and the cache is insufficient, fail.
//网络请求、缓存 都不能用,就返回 504
//如果 禁止使用网络 并且 缓存不足,返回504和空body的Response
if (networkRequest == null && cacheResponse == null)
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
// If we don't need the network, we're done.
//如果不用网络, cacheResponse肯定不为空了,那么即使用缓存,结束了,不会走后面的拦截器了。
//如果策略中不能使用网络,就把缓存的response封装返回
if (networkRequest == null)
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
//到这里,networkRequest != null(cacheResponse可能为null, 也可能不为null)
//networkRequest != null,就是要进行网络请求了,所以拦截器 就继续往下处理了。
Response networkResponse = null;
try
//调用拦截器链的process从网络获取数据
networkResponse = chain.proceed(networkRequest);
finally
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null)
closeQuietly(cacheCandidate.body());
// If we have a cache response too, then we're doing a conditional get.
//如果网络请求返回304,表示服务端资源没有修改,那么就结合 网络响应和缓存响应,然后更新缓存,返回,结束。
//如果有缓存的Response
if (cacheResponse != null)
//如果网络请求返回 code为304,即说明资源未修改
if (networkResponse.code() == HTTP_NOT_MODIFIED)
//直接封装缓存的Response返回即可
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))//结合header
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())//请求事件
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())//接受事件
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
else
//如果是非304,说明服务端资源有更新,就关闭缓存body
closeQuietly(cacheResponse.body());
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null)
//网络响应可缓存(请求和响应 的 头 Cache-Control都不是 “no-store”)
//判断是否具有主体 并且 是否可以缓存供后续使用
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest))
// Offer this request to the cache.
// 写入缓存
//加入缓存
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
//OkHttp默认只会对 get请求进行缓存,因为get请求的数据一般比较持久
//而post一般是交互操作,没太大意义进行缓存
//不是get请求就移除缓存
//如果请求方法无效,就从缓存中remove掉
if (HttpMethod.invalidatesCache(networkRequest.method()))
try
cache.remove(networkRequest);
catch (IOException ignored)
// The cache cannot be written.
return response;
我们通常会使用OkHttp 中使用网络缓存,提高访问效率。
- 有网络的时候:短时间内频繁的请求,后面的请求使用缓存中的资源。
- 无网络的时候:获取之前缓存的数据进行暂时的页面显示,当网络更新时对当前activity的数据进行刷新,刷新界面,避免界面空白的场景。
3.ConnectInterceptor
这个拦截器是关于TCP 连接的。连接拦截器(ConnectInterceptor)就是找到一个可用连接,也就是TCP连接,这个连接就是用于HTTP请求和响应的。
- 打开指定服务器的网络连接
- 交给下一个拦截器CallServerInterceptor来处理请求和获取数据的逻辑
@Override public Response intercept(Chain chain) throws IOException
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
// 如果请求是GET格式,需要一些额外的检查
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks)
synchronized (connectionPool)
if (noMoreExchanges)
throw new IllegalStateException("released");
if (exchange != null)
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
// 寻找ExchangeCodec对象
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
// 通过找到的codec对象构建Exchange对象
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
// 进行一些变量的赋值
synchronized (connectionPool)
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
首先通过 exchangeFinder.find 方法进行了对 ExchangeCodec 的查找,找到对应的 ExchangeCodec 对象,之后通过这个 codec 对象构建了一个 Exchange 对象并返回。
- ExchangeCodec:是一个连接所用的编码解码器,用于编码HTTP请求和解码HTTP响应。
- Exchange:封装这个编码解码器的一个工具类,用于管理ExchangeCodec,处理实际的 I/O。
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks)
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try
// findHealthyConnection 方法从而获取 RealConnection 对象
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//调用了 RealConnection.newCodec 方法获取 ExchangeCodec 对象
return resultConnection.newCodec(client, chain);
catch (RouteException e)
trackFailure();
throw e;
catch (IOException e)
trackFailure();
throw new RouteException(e);
寻找可用连接
我们先看到 findHealthyConnection 方法:
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException
while (true)
//可以看到这里是一个循环,不断地在调用 findConnection 方法寻找连接,
//若找不到 Healthy(可用)的连接,则继续循环直到找到为止。
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool)
if (candidate.successCount == 0 && !candidate.isMultiplexed())
return candidate;
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks))
candidate.noNewExchanges();
continue;
return candidate;
寻找连接的方法:findConnection 方法。
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool)
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new exchanges.
// 1、复用当前连接,检查这个连接是否可用和可复用
releasedConnection = transmitter.connection;
//如果连接不能创建Stream,则释放资源,返回待关闭的close socket
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
//证明连接可用
if (transmitter.connection != null)
// We had an already-allocated connection and it's good.
//存在可使用的已分配连接,复用当前连接
result = transmitter.connection;
releasedConnection = null;
//没有可以使用的连接,就去连接池中寻找
if (result == null)
// Attempt to get a connection from the pool.
//2、从连接池中获取可用连接
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false))
foundPooledConnection = true;
result = transmitter.connection;
else if (nextRouteToTry != null)
selectedRoute = nextRouteToTry;
nextRouteToTry = null;
else if (retryCurrentRoute())
selectedRoute = transmitter.connection.route();
closeQuietly(toClose);
//回调
if (releasedConnection != null)
eventListener.connectionReleased(call, releasedConnection);
if (foundPooledConnection)
eventListener.connectionAcquired(call, result);
if (result != null)
// If we found an already-allocated or pooled connection, we're done.
//找到了一个已分配或者连接池中的连接,过程结束,返回该连接
return result;
// If we need a route selection, make one. This is a blocking operation.
//否则,我们需要一个路由信息,这是个阻塞操作
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext()))
newRouteSelection = true;
routeSelection = routeSelector.next();
List<Route> routes = null;
synchronized (connectionPool)
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection)
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection.getAll();
//3、从连接池中获取可用连接(通过一组路由routes),通过更加全面的路由信息,再次从连接池中获取连接
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false))
foundPooledConnection = true;
result = transmitter.connection;
//如果还是没找到,则生成新的连接
if (!foundPooledConnection)
if (selectedRoute == null)
selectedRoute = routeSelection.next();
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
// 4、创建新连接
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
// If we found a pooled connection on the 2nd time around, we're done.
//如果连接是从连接池中找到,则说明是可复用的。不是新生成的
//如果新生成的连接则需要连接服务器才能使用
if (foundPooledConnection)
eventListener.connectionAcquired(call, result);
return result;
// Do TCP + TLS handshakes. This is a blocking operation.
//走到这说明是新生成的连接
//tcp和tls握手,阻塞操作,连接server
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
//将路由信息添加到routeDatabase的白名单中,证明该路由是可以连接到指定服务器的
connectionPool.routeDatabase.connected(result.route());
//连接合并的最后一次尝试,只有我们尝试多次时才会发生
//同一主机的并发连接
Socket socket = null;
synchronized (connectionPool)
connectingConnection = null;
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
// // 5、再获取一次连接,防止在新建连接过程中有其他竞争连接被创建了
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true))
// We lost the race! Close the connection we created and return the pooled connection.
//关闭创建的连接并返回连接池中的连接
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
// 有可能获得一个不健康的连接,如果是这种情况,将重试刚刚成功连接的路由
nextRouteToTry = selectedRoute;
else
6、使用创建的新连接,放入连接池,并返回
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
可用连接的优先级为:当前连接>连接池中的连接>新的连接。
- 如果当前连接可用,则优先选择当前连接
- 如果当前连接不可用,则从连接池中获取连接
- 如果连接池获取失败,则创建一个新的连接,并进行TCP和TSL握手,然后放到连接池中
连接复用可以省去TCP和TLS握手的过程,从而提高网络访问的效率。
获取连接的过程很复杂,大体流程如下:
1、检查当前连接是否可用。通过noNewExchanges判断这个连接是否可用。如果不可用,则会transmitter.connection置为空。当请求失败需要重试的时候或者重定向的时候,这时候连接还在,是可以直接进行复用的。
2、从连接池中获取可用连接。
下面两句代码的区别:
connectionPool.callAcquirePooledConnection(address, call, null, false)
connectionPool.callAcquirePooledConnection(address, call, routes, false)
1.多了一个routes字段。
涉及到HTTP/2的一个技术,叫做 HTTP/2 CONNECTION COALESCING(连接合并)。假设有两个域名,可以解析为相同的IP地址,并且是可以用相同的TLS证书(比如通配符证书),那么客户端可以重用相同的TCP连接从这两个域名中获取资源。
这个routes就是当前域名(主机名)可以被解析的ip地址集合,这两个方法的区别也就是一个传了路由地址,一个没有传。
继续看callAcquirePooledConnection代码:
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed)
assert (Thread.holdsLock(this));
for (RealConnection connection : connections)
if (requireMultiplexed && !connection.isMultiplexed()) continue;
if (!connection.isEligible(address, routes)) continue;
transmitter.acquireConnectionNoEvents(connection);
return true;
return false;
首先传入的requireMultiplexed为false ,isEligible(Address address, @Nullable List<Route> routes) ,判断主机名、端口号等,如果请求完全相同就直接返回这个连接。 如果主机名不同,还可以判断是不是HTTP/2请求,如果是就继续判断路由地址,证书,如果都能匹配上,那么这个连接也是可用的。
3、创建新连接。
如果没有从连接池中获取到新连接,那么就创建一个新连接,其实就是调用到socket.connect进行TCP连接。
4、再从连接池获取一次连接,防止在新建连接过程中有其他竞争连接被创建了。
因为在创建过程中,有可能有其他的请求和你一起创建了新连接,所以我们需要再去取一次连接,如果有可以用的,就直接用它,防止资源浪费。
其实这里又涉及到HTTP2的一个知识点:多路复用。即不需要当前连接的上一个请求结束之后再去进行下一次请求,只要有连接就可以直接用。HTTP/2中可以保证在同一个域名只建立一路连接,并且可以并发进行请求。
5、新连接放入连接池,并返回。
连接复用池:RealConnectionPool
1. 引用计数
在OkHttp中使用Transmitter类来计数。
计数加一和计数减一的操作其实是在改变List<Reference>列表的大小,List<Reference>的维护类是RealConnection,RealConnection是Socket物理连接的包装。List中的Transmitter弱引用数量就是socket被引用的计数,当计数为0时表示此连接是空闲的。
/** Current calls carried by this connection. */
final List<Reference<Transmitter>> transmitters = new ArrayList<>();
#Transmitter
void acquireConnectionNoEvents(RealConnection connection)
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
//引用计数加1
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
/**
* Remove the transmitter from the connection's list of allocations. Returns a socket that the
* caller should close.
*/
@Nullable Socket releaseConnectionNoEvents()
assert (Thread.holdsLock(connectionPool));
int index = -1;
for (int i = 0, size = this.connection.transmitters.size(); i < size; i++)
Reference<Transmitter> reference = this.connection.transmitters.get(i);
if (reference.get() == this)
index = i;
break;
if (index == -1) throw new IllegalStateException();
RealConnection released = this.connection;
//引用计数减一
released.transmitters.remove(index);
this.connection = null;
if (released.transmitters.isEmpty())
released.idleAtNanos = System.nanoTime();
if (connectionPool.connectionBecameIdle(released))
return released.socket();
return null;
- 2.RealConnectionPool
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
//空闲的socket最大连接数
private final int maxIdleConnections;
//socket的keepAlive时间
private final long keepAliveDurationNs;
//双向队列,里面维护了RealConnection也就是socket物理连接的包装
private final Deque<RealConnection> connections = new ArrayDeque<>();
//记录连接失败的route的黑名单,当连接失败的时候就会把失败的线路加进去
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning; //是否正在清理
public final class ConnectionPool
final RealConnectionPool delegate;
public ConnectionPool()
this(5, 5, TimeUnit.MINUTES);
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit)
this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
... ...
这里使用了代理模式,通过ConnectionPool类我们可以发现默认空闲的socket最大连接数为5,socket的保活时间为5分钟,并且在构造ConnectionPool对象时实际构造的是RealConnectionPool对象。
- 3. 缓存操作
3.1 放入连接
void put(RealConnection connection)
assert (Thread.holdsLock(this));
if (!cleanupRunning)
cleanupRunning = true;
//使用线程池执行清理任务
executor.execute(cleanupRunnable);
//将连接添加到双端队列中
connections.add(connection);
放入连接的工作有两个:
- 如果当前连接池没有在清理连接,则先使用线程池执行清理任务并且将正在清理的标志位设置为true
- 将当前连接添加到双端队列中
3.2 清理连接
在放入连接时我们会执行清理连接的操作,会调用线程池执行cleanupRunnable的任务,让我们先看看这个任务
//线程不断调用cleanup来进行清理,并返回下次需要清理的间隔时间
private final Runnable cleanupRunnable = () ->
while (true)
//清理连接,并返回下次需要清理的间隔时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0)
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this)
try
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
catch (InterruptedException ignored)
;
可以发现一旦清理任务开始执行后,就会每隔指定的间隔时间进行清理连接。
RealConnectionPool#cleanup
long cleanup(long now)
//正在使用的连接数量
int inUseConnectionCount = 0;
//空闲的连接数量
int idleConnectionCount = 0;
//长时间空闲的连接
RealConnection longestIdleConnection = null;
//最长空闲时间
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this)
// 遍历连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); )
RealConnection connection = i.next();
//查询此连接的transmitter的引用数量
//如果引用数量大于0,则使用数量inUseConnectionCount加1
//否则闲置数量idleConnectionCount加1
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0)
inUseConnectionCount++;
continue;
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
//寻找空闲最久的那个连接
//得到空闲时间
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs)
//最长空闲时间
longestIdleDurationNs = idleDurationNs;
//最长空闲连接
longestIdleConnection = connection;
//如果空闲连接的空闲时间超过5分钟,或者空闲连接数超过5个,则移除空闲最久的连接
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections)
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(以上是关于OKHttp源码分析的主要内容,如果未能解决你的问题,请参考以下文章