Okio源码解析
Posted zhuliyuan丶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Okio源码解析相关的知识,希望对你有一定的参考价值。
Okio是对java原生io的封装,旨在简化api同时优化io操作性能。接下来我会从下面几个方面介绍
- Okio特性概述
- 读写流程源码查看
- Buffer精华操作
- Timeout超时处理
1. Okio特性概述
java已经提供了很多io实现类供我们在不同场景使用,而Okio并不是一种新的io,而是对原生io的一次封装,为的是解决原生io的一些缺陷,下面我们介绍Okio的特性
1.1 api简化
我们知道java的io相关的类非常多,有针对字节和字符的输入输出接口,有实现缓存的Bufferedxxx,以及各种子类实现比如文件的(FileInputStream和FileOutputStream),数据的(DataInputStream和DataOutputStream),对象的(ObjectInputStream和ObjectOutputStream)等等,针对不同的场景我们需要使用不同的类,是非常复杂的。
而Okio简化了这一流程,统一通过Okio这个工厂类初始化,内部是通过重载方法根据传入的参数不同初始化不同的流。
举个板栗
File file = new File(Environment.getExternalStorageState(), "test");
Okio.buffer(Okio.sink(file)).writeString("aaaa", Util.UTF_8).close();
由于传入的参数是File,内部是通过FileOutputStream进行io写操作,并且支持链式操作。
1.2 缓存优化
原生的io缓存比较粗暴,Okio在这方面做了很多优化。
以BufferedOutputStream为例,是利用一个长度为8192的byte数组缓存,当要写入数据长度大于缓存最大容量时跳过缓存直接进行io写操作,当写入数据长度大于缓存数组剩余容量的时候先把缓存写入输出流,再将数据写入缓存,其他情况直接写入缓存。
然后原生的输入输出流之间的buffer是没有办法直接建立联系的,输入流中的数据转移到输出流中是:输入buf -> 临时byte数组 -> 输出buf,经历两次拷贝
//原生
File file = new File(Environment.getExternalStorageState(), "test");
try
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
int count = -1;
byte[] array = new byte[1024];//临时byte数组
while ((count = bis.read(array)) != -1)
bos.write(array, 0, count);
bis.close();
bos.close();
catch (FileNotFoundException e)
e.printStackTrace();
catch (IOException e)
e.printStackTrace();
okio的话则是用一个Buffer对象去负责缓存,内部维护了一个Segment双向链表,而这个Segment则是真正的数据缓存载体,内部利用一个长度为8192的byte数组去缓存,当要写入的数据超过8192的时候则会创建多个Segment对象在链表中有序存储。当Segment使用完毕的时候又会利用SegmentPool进行回收,减少Segment对象创建。
对于输入流流到输出流有专门的优化,直接将输入流buffer中segment数据转移到输出流的buffer中只有一次数据的操作,相比原生粗暴的buffer做了很多优化。(而实际操作是,如果是整段的segment数据转移则是直接修改指针指到输出流的buffer上,如果只转移输入流Segment部分数据则根据输出Segment能不能装得下,装得下的话则进行数据拷贝,否则拆分输入流Segment为两个然后将要转移数据的Segment指针指向输出流Buffer,总之Okio在这块做了很多优化,这个后面会细说)
1.3 超时操作
原生进行io操作的时候是阻塞式的,没有超时的处理,除非发生异常才会停止,okio增加了超时处理,推出了Timeout机制,提供了同步超时和异步超时处理。
同步超时Timeout:是在每次进行读写前检测是否超时,如果超时则抛出异常,那如果检测完之后操作阻塞了很久是没法停止的,只有等到下一次操作的时候才会抛出异常停止操作。
异步超时AsyncTimeout:是在每次要进行读写操作的时候创建一个AsymcTimeout对象,然后通过一个链表存储,根据即将超时时间排序,快要超时的排在链表头部,然后启动一个Watchdog线程进行检测,优先从链表头部取出AsyncTimeout判断是否超时,超时了的话则调用AsyncTimeout#timeout()方法。okio给用socket创建流提供了默认实现,timeout的时候直接关闭Socket。
1.4 相关类介绍
- Source:Okio对输入流的抽象接口,提供了read方法将数据读到buffer中
- Sink:Okio对输出流的抽象接口,提供了write方法将数据写到buffer中
- BufferedSource和BufferedSink:是Okio对Buffer的接口抽象,分别继承Source和Sink
- RealBufferedSource:BufferedSource实现类,读操作都是由该类来完成,可以通过Okio工厂获取
- RealBufferedSink:BufferedSink实现类,写操作都是由该类来完成,可以通过Okio工厂获取
- Okio:okio的工厂来用来获取buffer实现类和流的实现类
- Segment:Buffer中存储数据的载体,内部通过一个8K的byte数组存储数据
- SegmentPool:管理Segment创建和回收,最多存储64K的数据也就是8个Segment,当池中存在Segment的时候会复用减少对象的创建
以读写文件为例,读是先文件内容读到buffer中,然后再从buffer中获取数据,写是先写数据到buffer中,然后将将buffer中的数据写入文件,而buffe中数据是通过一个个Segment存储的,Segment则是通过SegmentPool创建和回收,每个类各司其职。
2. 读写流程源码
2.1 读流程
File file = new File(Environment.getExternalStorageState(), "test");
try
BufferedSource source = Okio.buffer(Okio.source(file));
byte[] array = new byte[1024];
source.read(array);
source.close();
catch (Exception e)
e.printStackTrace();
根据上面的例子我们分几段查看源码
- Okio.source()
- Okio.buffer()
- BufferedSource.read()
2.1.1 Okio.source()
//Okio#source
public static Source source(File file) throws FileNotFoundException
if (file == null) throw new IllegalArgumentException("file == null");
return source(new FileInputStream(file));
可以看到由于我们传入的是file所以内部实际是通过FileInputStream读文件
public static Source source(InputStream in)
return source(in, new Timeout());
private static Source source(final InputStream in, final Timeout timeout)
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source()
@Override public long read(Buffer sink, long byteCount) throws IOException
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try
timeout.throwIfReached();//判断是否超时
Segment tail = sink.writableSegment(1);//从buffer中拿到一个可写的Segment
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);//byteCount是要读取的数据总量,Segment.SIZE - tail.limit是Segment可以装的数据量,取最小值
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);//然后通过FileInputStream将文件数据读到segment的data中
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
catch (AssertionError e)
if (isandroidGetsocknameError(e)) throw new IOException(e);
throw e;
@Override public void close() throws IOException
in.close();
@Override public Timeout timeout()
return timeout;
@Override public String toString()
return "source(" + in + ")";
;
上面这段就是创建了一个Source实现类,read的话则是先获取buffer中可写的segment,然后用FileInputStream将文件的数据读到segment中。对于Segment和Buffer在下面会说到
2.1.2 Okio.Buffer()
//Okio#buffer
public static BufferedSource buffer(Source source)
return new RealBufferedSource(source);
由于我们传入的是Source所以会创建一个RealBufferedSource实例,Okio也都是通过它来做读操作,我们先来看下这个类
final class RealBufferedSource implements BufferedSource
public final Buffer buffer = new Buffer();//buffer对象
public final Source source;//Source实现类
RealBufferedSource(Source source)
if (source == null) throw new NullPointerException("source == null");
this.source = source;
@Override
public int readInt() throws IOException
require(4);//最终通过source#read()将数据读到Buffer
return buffer.readInt();//再从Buffer读取数据
@Override
public int read(byte[] sink, int offset, int byteCount) throws IOException
checkOffsetAndCount(sink.length, offset, byteCount);
if (buffer.size == 0)
long read = source.read(buffer, Segment.SIZE);//先将数据读到Buffer
if (read == -1) return -1;
int toRead = (int) Math.min(byteCount, buffer.size);
return buffer.read(sink, offset, toRead);//再从Buffer读取数据
@Override
public String readString(Charset charset) throws IOException
if (charset == null) throw new IllegalArgumentException("charset == null");
buffer.writeAll(source);//最终通过source#read()将数据读到Buffer
return buffer.readString(charset);//再从Buffer读取数据
//后面省略了很多个read方法
可以看到它先将Okio.buffer(Okio.source(file))
传入的Source实现类通过source成员变量存储,然后创建了一个Buffer对象用作缓冲区,而read方法都是一个流程通过source#read()方法将数据写入缓冲区,然后再从缓冲区中读取数据。接下来我们在看下缓冲区Buffer对象
public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel
@Nullable Segment head;//缓存数据的载体
long size;//当前缓冲区的数据量单位字节
public Buffer()
//下面省略很多读写缓冲区的操作
Buffer是负责管理缓冲区的对象,内部则是通过一个双向环状链表Segment存储数据,然后我们再来看下Segment是如何存储数据的呢
final class Segment
static final int SIZE = 8192;//最大存储8K数据
final byte[] data;//存储数据数组
int pos;//当前segment数据已经读取到哪了,接下来从pos位置开始读
int limit;//当前segment数据写到哪了,接下来从limit开始写
boolean owner;//当前segment是否允许追加数据,也就是能不能多次写入
Segment next;
Segment prev;
Segment()
this.data = new byte[SIZE];
this.owner = true;//默认可以多次写入
this.shared = false;
可以看到segment内部有一个8k的byte数组来存储数据,pos记录的是segment可读数据位置,(pos-1)到0是已经读过的数据,limit是segment可写数据的位置,limit到Segment.SIZE是剩余可写数据量,pos到limit是还未读取的数据。
ok那我们回到本例中的source.read(array)
@Override public int read(byte[] sink) throws IOException
return read(sink, 0, sink.length);
@Override public int read(byte[] sink, int offset, int byteCount) throws IOException
checkOffsetAndCount(sink.length, offset, byteCount);//数组相关检查
if (buffer.size == 0) //如果缓冲区是空的
long read = source.read(buffer, Segment.SIZE);//将source中的数据读取到buffer中
if (read == -1) return -1;
int toRead = (int) Math.min(byteCount, buffer.size);
return buffer.read(sink, offset, toRead);//将数据写入sink也就是byte数组
对于source#read我们在2.1.1已经看过就是取出Buffer中的Segment,然后将数据写入。不过细节当时我们没说现在来看下
//source#read()
@Override public long read(Buffer sink, long byteCount) throws IOException
try
Segment tail = sink.writableSegment(1);//从buffer中拿到一个可写的Segment
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);//byteCount是要读取的数据总量,Segment.SIZE - tail.limit是Segment可写数据量,取最小值
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);//然后通过FileInputStream将文件数据读到segment的data数组中
if (bytesRead == -1) return -1;
tail.limit += bytesRead;//修改segment可写位置
sink.size += bytesRead;//修改buffer中数据总量
return bytesRead;
catch (AssertionError e)
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
buffer#writableSegment(1)如何获取Segment的
Segment writableSegment(int minimumCapacity)
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) //如果buffer中没有Segment
head = SegmentPool.take(); //从SegmentPool中获取
return head.next = head.prev = head;//维护Buffer内部的Segment双向环状链表
Segment tail = head.prev;//拿到最后一个Segment
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) //如果最后一个Segment可写位置limit+需要的最小容量>Segment.SIZE || 该Segment不支持追加写入
tail = tail.push(SegmentPool.take()); // 添加一个新的Segment到尾部
return tail;
如果Buffer中没有Segment则直接从SegmentPool获取,如果有则获取链表尾部也就是最新的Segment,判断数据是否存的下,存不下的话从SegmentPool获取一个新的插到链表尾部
接下来看下SegmentPool#take如何创建Segment的
final class SegmentPool
static final long MAX_SIZE = 64 * 1024; // 64 KiB.最大容量64K
static @Nullable Segment next;//通过一个单链表存储回收的Segment
static long byteCount;//当前SegmentPool容量
private SegmentPool()
static Segment take()
synchronized (SegmentPool.class)
if (next != null) //如果当前有回收的Segment
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
return new Segment(); //否则直接创建
static void recycle(Segment segment)
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; //当前Segment如果有共享数据不能回收(这个后面说)
synchronized (SegmentPool.class)
if (byteCount + Segment.SIZE > MAX_SIZE) return; //当前SegmentPool满了的话则不能回收
byteCount += Segment.SIZE;//容量增加
segment.next = next;//加到链表中
segment.pos = segment.limit = 0;//可写和可读位置都清零
next = segment;
SegmentPool#take就是看当前的池子中有缓存的Segment的么,有直接使用,没有则创建一个。ok在回到最前面RealBufferedSource#read
@Override
public int read(byte[] sink, int offset, int byteCount) throws IOException
checkOffsetAndCount(sink.length, offset, byteCount);
if (buffer.size == 0)
long read = source.read(buffer, Segment.SIZE);//先将数据读到Buffer
if (read == -1) return -1;
int toRead = (int) Math.min(byteCount, buffer.size);
return buffer.read(sink, offset, toRead);//再从Buffer读取数据
第一块source#read(buffer, Segment.SIZE)已经梳理了一遍就是通过FileInputStream将数据读到Buffer的Segment中,然后再来buffer#read将数据读到byte数组中
@Override public int read(byte[] sink, int offset, int byteCount)
checkOffsetAndCount(sink.length, offset, byteCount);
Segment s = head;//拿到第一个Segment
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);//判断要读取的数据量,取byteCount和当前segment可读的数据量s.limit - s.pos
System.arraycopy(s.data, s.pos, sink, offset, toCopy);//将数据拷贝到数组中
s.pos += toCopy;//移动Segment已经读过的数据指针pos
size -= toCopy;//当前Buffer容量减去读过数据量
if (s.pos == s.limit) //如果当前Segment已经读完
head = s.pop()以上是关于Okio源码解析的主要内容,如果未能解决你的问题,请参考以下文章
13 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio
09 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio
12 | Android 高级进阶(源码剖析篇) Square 高效易用的 IO 框架 okio