Okhttp3 设计思想学习
Posted 不会写代码的丝丽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Okhttp3 设计思想学习相关的知识,希望对你有一定的参考价值。
前言
Okhttp
的源码其实并不难,难的是对于Http的协议的熟悉程度.比如如何缓存.其实Okhttp
参考了当前很多热门的浏览器源码而编写成的客户端.你可以看到很多线程数的设置都参考了主流浏览器.你可以把Okhttp
视为一个无渲染界面的浏览器即可.
Http缓存
想要看得懂Okhttp
拦截器你首先必须对Http协议自带的缓存机制有一定的了解.
这里给出两个参考
1 HTTP 缓存(建议切换到英文版本)
2 Wikipedia HTTP头字段
3 可能是最被误用的 HTTP 响应头之一 Cache-Control: must-revalidate
4 Cache-Control
- 缓存基础概念:
浏览器会将一些Http响应
存储到本地
,下次再次访问一个地址时,判断缓存是否存在且有效,如果有效直接返回磁盘存储的响应体即可.如果没有缓存重新请求服务器.如果缓存存在但是已经过期,那么进行缓存重校验
:带着一些特殊请求参数询问服务器缓存是否还可用,如果还可以用那么返回状态码304
且不包含实际响应体.如果缓存无效服务端直接重新返回新的资源,状态码为200
你会发现浏览器在处理缓存和重新校验过期缓存是否有效时会大量处理请求头,这里我们给出一个总结图(你可以通过上文的参考文献了解具体的缓存细节):
(可点击放大图片哦)
上面便是一个不太严谨的缓存流程,当然你需要注意的是仅有Get
能被缓存.
Okhttp
也实现上面的一套磁盘缓存流程.
Http1.1协议
我们知道Http1.1
默认实现了保持长连接和复用Socket
的功能,且大多数浏览器对于单个地址(ip和端口相同)套接字仅开启若干个链接.
上图是Chrome
浏览器中的某次请求每一行都是一个socket
.在okhttp
中最大开启5个相同地址套接字.
Http2.0 协议
我们知道Http2.0
有一个多路复用
的概念,请看下图
也就是说Okhttp同样要实现Http2.0
所有特性,当然本文不会分析这块源码.只是让读者在自行分析这块源码可以先了解Http2.0
在进行分析不然你不会明白Okhttp到底在写什么
源码分析
看完上面的前置的知识你会发现Okhttp
要实现的功能真的很庞大,这也是为什么Okhttp
能在众多网络框架脱颖而出的原因.
Okhttp
默认不开启缓存,所以为了分析源码流程你可以开启缓存以及tls/ssl证书方便抓包分析.
这里给出一个网上抄写的代码段:
/**
* Created by PandaQ on 2016/11/10.
* 封装的支持Https连接的Okhttp客户端
* email : 767807368@qq.com
*/
public class HttpsUtils {
//返回一个带缓存和信任所有证书的okhttp
public OkHttpClient getTrustAllClient() {
//添加缓存
Cache cache = new Cache(
new File("todo 缓存路径", "http_cache"),
// $0.05 worth of phone storage in 2020
50L * 1024L * 1024L // 50 MiB
);
OkHttpClient.Builder mBuilder = new OkHttpClient.Builder();
//信任所有证书
mBuilder.sslSocketFactory(createSSLSocketFactory(), mMyTrustManager)
.hostnameVerifier(new TrustAllHostnameVerifier())
.cache(cache);
return mBuilder.build();
}
private MyTrustManager mMyTrustManager;
public SSLSocketFactory createSSLSocketFactory() {
SSLSocketFactory ssfFactory = null;
try {
mMyTrustManager = new MyTrustManager();
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, new TrustManager[]{mMyTrustManager}, new SecureRandom());
ssfFactory = sc.getSocketFactory();
} catch (Exception ignored) {
ignored.printStackTrace();
}
return ssfFactory;
}
//实现X509TrustManager接口
public class MyTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
//实现HostnameVerifier接口
public static class TrustAllHostnameVerifier implements HostnameVerifier {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
}
/**
* 对外提供的获取支持自签名的okhttp客户端
*
* @param certificate 自签名证书的输入流
* @return 支持自签名的客户端
*/
public OkHttpClient getTrusClient(InputStream certificate) {
X509TrustManager trustManager;
SSLSocketFactory sslSocketFactory;
try {
trustManager = trustManagerForCertificates(certificate);
SSLContext sslContext = SSLContext.getInstance("TLS");
//使用构建出的trustManger初始化SSLContext对象
sslContext.init(null, new TrustManager[]{trustManager}, null);
//获得sslSocketFactory对象
sslSocketFactory = sslContext.getSocketFactory();
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
return new OkHttpClient.Builder()
.sslSocketFactory(sslSocketFactory, trustManager)
.build();
}
/**
* 获去信任自签证书的trustManager
*
* @param in 自签证书输入流
* @return 信任自签证书的trustManager
* @throws GeneralSecurityException
*/
private X509TrustManager trustManagerForCertificates(InputStream in)
throws GeneralSecurityException {
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
//通过证书工厂得到自签证书对象集合
Collection<? extends Certificate> certificates = certificateFactory.generateCertificates(in);
if (certificates.isEmpty()) {
throw new IllegalArgumentException("expected non-empty set of trusted certificates");
}
//为证书设置一个keyStore
char[] password = "password".toCharArray(); // Any password will work.
KeyStore keyStore = newEmptyKeyStore(password);
int index = 0;
//将证书放入keystore中
for (Certificate certificate : certificates) {
String certificateAlias = Integer.toString(index++);
keyStore.setCertificateEntry(certificateAlias, certificate);
}
// Use it to build an X509 trust manager.
//使用包含自签证书信息的keyStore去构建一个X509TrustManager
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, password);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(keyStore);
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
throw new IllegalStateException("Unexpected default trust managers:"
+ Arrays.toString(trustManagers));
}
return (X509TrustManager) trustManagers[0];
}
private KeyStore newEmptyKeyStore(char[] password) throws GeneralSecurityException {
try {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
InputStream in = null; // By convention, 'null' creates an empty key store.
keyStore.load(null, password);
return keyStore;
} catch (IOException e) {
throw new AssertionError(e);
}
}
}
本文分析okhttp4.9.0
版本.
分发请求事件
我们看一个小例子
fun main() {
val httpsUtils = HttpsUtils()
//得到一个
val okHttpClient = httpsUtils.trustAllClient
val request: Request = Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build()
//okHttpClient.newCall返回对象为RealCall
val newCall: RealCall = okHttpClient.newCall(request) as RealCall
//同步调用
val response = newCall.execute()
//异步调用
val response2 = newCall.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})
}
我们知道okhttp有同步调用和异步调用方式如下:
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
//同步调用
override fun execute(): Response {
//..略
try {
//关注点 ------------------1
client.dispatcher.executed(this)
//责任链相关,这里我们先不关心
//这里会执行责任链完成http请求
return getResponseWithInterceptorChain()
} finally {
//完成后移除任务,取出另一个准备运行的任务
client.dispatcher.finished(this)
}
}
//异步调用
override fun enqueue(responseCallback: Callback) {
//关注点 ------------------2
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
}
不管异步还是同步调用都会调用client.dispatcher
相关函数.
我们首先看看同步的情况client.dispatcher.executed(this)
class Dispatcher constructor() {
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
//可以看到被Synchronized修饰,也就是多线程情况执行保证可见性有序性等
@Synchronized internal fun executed(call: RealCall) {
//放入集合中,这个集合存放所有同步运行的任务
runningSyncCalls.add(call)
}
}
我们同样看看异步相关函数:
client.dispatcher.enqueue(AsyncCall(responseCallback))
class Dispatcher constructor() {
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
// 存放同步运行的任务
private val runningSyncCalls = ArrayDeque<RealCall>()
/** Ready async calls in the order they'll be run. */
//存放所有还未运行的异步任务
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//正在运行的异步任务
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//存放到准备运行的异步队列中
readyAsyncCalls.add(call)
//判断是否为websocket
//websocket无法共享socket套接字
if (!call.call.forWebSocket) {
//从正在运行的异步队列或者准备运行队列中获取一个相同域名call,
//每个相同域名的请求会共享同一个计数器AtomicInteger实例
//计数器用于统计相同域名请求数量
val existingCall = findExistingCallWithHost(call.host)
//将上一个call的计数器对象放入新的call中
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//判断当前是否可以直接运行call任务而不是在readyAsyncCalls队列中
promoteAndExecute()
}
private fun findExistingCallWithHost(host: String): AsyncCall? {
//从正在运行的异步队列或者准备运行队列中获取一个相同域名call
for (existingCall in runningAsyncCalls) {
if (existingCall.host == host) return existingCall
}
for (existingCall in readyAsyncCalls) {
if (existingCall.host == host) return existingCall
}
return null
}
}
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
//这里Volatile用于保证callsPerHost这个对象赋值后立马对其他线程可见
//AtomicInteger内部计数器已经保证可见性
@Volatile var callsPerHost = AtomicInteger(0)
private set
//共享计数器
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
class Dispatcher constructor() {
private fun promoteAndExecute(): Boolean {
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
//使用粗锁
synchronized(this) {
//待运行的队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//当前运行的http请求数量必须小于64个. var maxRequests = 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//callsPerHost为AtomicInteger(0) 同一个域名的共享同一个AtomicInteger对象,是用于统计当前到底有多个相同域名请求,你可以enqueue函数中看到他们是如何共享这个计数器的
//当前判断对同一个域名的请求是否大于5个.maxRequestsPerHost默认为5
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//能运行到这个证明 符合运行条件,将这个任务取出放入runningAsyncCalls
i.remove()
//这个域名计数器加一
asyncCall.callsPerHost.incrementAndGet()
//放入集合中待会全部交付给线程池执行
executableCalls.add(asyncCall)
//放入运行队列
runningAsyncCalls.add(asyncCall)
}
//runningCallsCount等于 runningAsyncCalls.size + runningSyncCalls.size
isRunning = runningCallsCount() > 0
}
//从集合中取出然后交付给线程池执行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//executorService是一个线程池,这里就是讲asyncCall丢到线程池执行
asyncCall.executeOn(executorService)
}
return isRunning
}
}
class internal inner class AsyncCall(
private val responseCallback: Callback
) {
fun executeOn(executorService: ExecutorService) {
//略过无关代码
//丢到线程池执行
executorService.execute(this)
}
}
我们最后看看executorService
线程池
class Dispatcher constructor() {
//实际的线程池对象
private var executorServiceOrNull: ExecutorService? = null
//executorService只是一个代理对象实现了懒加载
//@get:Synchronized防止数据竞争
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//SynchronousQueue()配合Int.MAX_VALUE可以看到这个线程池是无限制的线程池
//只要来任务就开辟线程或者复用线程进行调度
//这里之所以构造这样的线程池我想是因为Okhttp已经外部严格控制了任务队列了,不需要线程池掌控调度
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
}
当然最后会回调AsyncCall
的run
函数
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
override fun run() {
//threadName是一个工具类,仅仅调用后面的闭包时修改线程名字,调用完修改回线程名字
//这里主要方便调试
threadName("OkHttp ${redactedUrl()}") {
try {
//这里主要构建责任链,责任链负责真正的网络请求,也就是网上看到
//责任链处理完后得到响应结果返回给回调
val response = getResponseWithInterceptorChain()
//这里将结果回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
//回调错误
responseCallback.onFailure(this@RealCall, e)
} catch (t: Throwable) {
//其他错误处理
cancel()
throw t
} finally {
//完成请求的后续处理
client.dispatcher.finished(this)
}
}
}
}
}
在分析责任链之前我们看看,请求完成之后是如何处理的,也就是client.dispatcher.finished(this)
class Dispatcher constructor() {
internal fun finished(call: AsyncCall) {
//同域名请求的计数器数量减一
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//从runningAsyncCalls移除这个这个请求
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//promoteAndExecute前面已经讲过了.取出待运行的任务运行
//如果有运行那么isRunning返回true
val isRunning = promoteAndExecute()
//如果没有运行的请求,那么回调idleCallback
//这个idleCallback是okhttp留给上层开发者自行实现的.以上是关于Okhttp3 设计思想学习的主要内容,如果未能解决你的问题,请参考以下文章
优雅设计封装基于Okhttp3的网络框架:多线程下载添加数据库支持(greenDao)及 进度更新
Java EE 架构设计——基于okhttp3 的网络框架设计
优雅设计封装基于Okhttp3的网络框架:多线程单例模式优化 及 volatile构建者模式使用解析